307 lines
7.4 KiB
Go
307 lines
7.4 KiB
Go
package pmuxlib
|
|
|
|
import (
|
|
"bufio"
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"os/exec"
|
|
"strings"
|
|
"sync"
|
|
"syscall"
|
|
"time"
|
|
)
|
|
|
|
// ProcessConfig is used to configure a process.
|
|
type ProcessConfig struct {
|
|
// Cmd and Args describe the actual process to run.
|
|
Cmd string `yaml:"cmd"`
|
|
Args []string `yaml:"args"`
|
|
|
|
// Env describes the environment variables to set on the process.
|
|
Env map[string]string `yaml:"env"`
|
|
|
|
// Dir is the directory the process will be run in. If not set then the
|
|
// process is run in the same directory as this parent process.
|
|
Dir string `yaml:"dir"`
|
|
|
|
// MinWait and MaxWait are the minimum and maximum amount of time between
|
|
// restarts that Process will wait.
|
|
//
|
|
// MinWait defaults to 1 second.
|
|
// MaxWait defaults to 64 seconds.
|
|
MinWait time.Duration `yaml:"minWait"`
|
|
MaxWait time.Duration `yaml:"maxWait"`
|
|
|
|
// SigKillWait is the amount of time after the process is sent a SIGINT
|
|
// before a SIGKILL is sent.
|
|
//
|
|
// Defalts to 10 seconds.
|
|
SigKillWait time.Duration `yaml:"sigKillWait"`
|
|
|
|
// NoRestartOn indicates which exit codes should result in the process not
|
|
// being restarted any further.
|
|
NoRestartOn []int `yaml:"noRestartOn"`
|
|
|
|
// StartAfterFunc will cause the starting of the process to be blocked until
|
|
// this function returns. If used with RunProcessOnce and an error is
|
|
// returned, RunProcessOnce will return that error.
|
|
StartAfterFunc func(context.Context) error
|
|
|
|
// Group can be used to control the order and grouping of processes as they
|
|
// shut down.
|
|
//
|
|
// Processes will not be shut down until all processes with a higher group
|
|
// number are already shut down. Processes with the same group number will
|
|
// be shut down simultaneously.
|
|
Group int `yaml:"group"`
|
|
|
|
// Where to send stdout, stderr log lines, and lines from pmux itself.
|
|
StdoutLogger Logger `yaml:"-"`
|
|
StderrLogger Logger `yaml:"-"`
|
|
SysLogger Logger `yaml:"-"`
|
|
}
|
|
|
|
func (cfg ProcessConfig) withDefaults() ProcessConfig {
|
|
|
|
if cfg.MinWait == 0 {
|
|
cfg.MinWait = 1 * time.Second
|
|
}
|
|
|
|
if cfg.MaxWait == 0 {
|
|
cfg.MaxWait = 64 * time.Second
|
|
}
|
|
|
|
if cfg.SigKillWait == 0 {
|
|
cfg.SigKillWait = 10 * time.Second
|
|
}
|
|
|
|
return cfg
|
|
}
|
|
|
|
func sigProcessGroup(sysLogger Logger, proc *os.Process, sig syscall.Signal) {
|
|
sysLogger.Printf("sending %v signal", sig)
|
|
|
|
// Because we use Setpgid when starting child processes, child processes
|
|
// will have the same PGID as their PID. To send a signal to all processes
|
|
// in a group, you send the signal to the negation of the PGID, which in
|
|
// this case is equivalent to -PID.
|
|
//
|
|
// POSIX is a fucking joke.
|
|
if err := syscall.Kill(-proc.Pid, sig); err != nil {
|
|
|
|
panic(fmt.Errorf(
|
|
"failed to send %v signal to %d: %w",
|
|
sig, -proc.Pid, err,
|
|
))
|
|
}
|
|
}
|
|
|
|
// RunProcessOnce runs the process described by the ProcessConfig (though it
|
|
// doesn't use all fields from the ProcessConfig).
|
|
//
|
|
// The process is killed if-and-only-if the context is canceled, returning -1
|
|
// and context.Canceled. Otherwise the exit status of the process is returned,
|
|
// or -1 and an error.
|
|
//
|
|
// The stdout and stderr of the process will be written to the corresponding
|
|
// Loggers. Various runtime events will be written to the sysLogger.
|
|
func RunProcessOnce(ctx context.Context, cfg ProcessConfig) (int, error) {
|
|
|
|
cfg = cfg.withDefaults()
|
|
|
|
if cfg.StartAfterFunc != nil {
|
|
if err := cfg.StartAfterFunc(ctx); err != nil {
|
|
return -1, err
|
|
}
|
|
}
|
|
|
|
var wg sync.WaitGroup
|
|
|
|
fwdOutPipe := func(logger Logger, r io.Reader) {
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
bufR := bufio.NewReader(r)
|
|
for {
|
|
line, err := bufR.ReadString('\n')
|
|
if errors.Is(err, io.EOF) {
|
|
return
|
|
} else if err != nil {
|
|
logger.Printf("reading output: %v", err)
|
|
return
|
|
}
|
|
|
|
logger.Println(strings.TrimSuffix(line, "\n"))
|
|
}
|
|
}()
|
|
}
|
|
|
|
cmd := exec.Command(cfg.Cmd, cfg.Args...)
|
|
|
|
cmd.Dir = cfg.Dir
|
|
|
|
cmd.SysProcAttr = &syscall.SysProcAttr{
|
|
// Indicates that the child process should be a part of a separate
|
|
// process group than the parent, so that it does not receive signals
|
|
// that the parent receives. This is what ensures that context
|
|
// cancellation is the only way to interrupt the child processes.
|
|
Setpgid: true,
|
|
}
|
|
|
|
cmd.Env = os.Environ()
|
|
for k, v := range cfg.Env {
|
|
cmd.Env = append(cmd.Env, k+"="+v)
|
|
}
|
|
|
|
stdout, err := cmd.StdoutPipe()
|
|
if err != nil {
|
|
return -1, fmt.Errorf("getting stdout pipe: %w", err)
|
|
}
|
|
defer stdout.Close()
|
|
|
|
stderr, err := cmd.StderrPipe()
|
|
if err != nil {
|
|
return -1, fmt.Errorf("getting stderr pipe: %w", err)
|
|
}
|
|
defer stderr.Close()
|
|
|
|
fwdOutPipe(cfg.StdoutLogger, stdout)
|
|
fwdOutPipe(cfg.StderrLogger, stderr)
|
|
|
|
if err := cmd.Start(); err != nil {
|
|
return -1, fmt.Errorf("starting process: %w", err)
|
|
}
|
|
|
|
stopCh := make(chan struct{})
|
|
|
|
go func(proc *os.Process) {
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
sigProcessGroup(cfg.SysLogger, proc, syscall.SIGINT)
|
|
case <-stopCh:
|
|
return
|
|
}
|
|
|
|
select {
|
|
case <-time.After(cfg.SigKillWait):
|
|
sigProcessGroup(cfg.SysLogger, proc, syscall.SIGKILL)
|
|
case <-stopCh:
|
|
}
|
|
|
|
}(cmd.Process)
|
|
|
|
wg.Wait()
|
|
|
|
err = cmd.Wait()
|
|
close(stopCh)
|
|
|
|
if err := ctx.Err(); err != nil {
|
|
return -1, err
|
|
}
|
|
|
|
if err != nil {
|
|
return -1, fmt.Errorf("process exited: %w", err)
|
|
}
|
|
|
|
return cmd.ProcessState.ExitCode(), nil
|
|
}
|
|
|
|
// Process is used to manage a running process. Methods on a Process are _not_
|
|
// thread-safe.
|
|
//
|
|
// Stop must be called on a Process before the program has exited, or there may
|
|
// be a leftover zombie child process.
|
|
//
|
|
// The process will be restarted if it exits of its own accord. There will be a
|
|
// brief wait time between each restart, with an exponential backoff mechanism
|
|
// so that the wait time increases upon repeated restarts.
|
|
//
|
|
// The stdout and stderr of the process will be written to the corresponding
|
|
// Loggers. Various runtime events will be written to the sysLogger.
|
|
type Process struct {
|
|
cfg ProcessConfig
|
|
stopFn context.CancelFunc
|
|
doneCh chan struct{}
|
|
}
|
|
|
|
// NewProcess returns a new Process instance based on the given config.
|
|
func NewProcess(cfg ProcessConfig) *Process {
|
|
p := &Process{
|
|
cfg: cfg.withDefaults(),
|
|
}
|
|
p.run()
|
|
return p
|
|
}
|
|
|
|
func (p *Process) run() {
|
|
var ctx context.Context
|
|
ctx, p.stopFn = context.WithCancel(context.Background())
|
|
p.doneCh = make(chan struct{})
|
|
|
|
go func() {
|
|
defer close(p.doneCh)
|
|
p.restartLoop(ctx)
|
|
}()
|
|
}
|
|
|
|
func (p *Process) restartLoop(ctx context.Context) {
|
|
var wait time.Duration
|
|
|
|
for {
|
|
start := time.Now()
|
|
exitCode, err := RunProcessOnce(ctx, p.cfg)
|
|
took := time.Since(start)
|
|
|
|
// TODO check if error was due to StartAfterFunc, change the log if so.
|
|
if err != nil {
|
|
p.cfg.SysLogger.Printf("exited: %v", err)
|
|
} else {
|
|
p.cfg.SysLogger.Printf("exit code: %d", exitCode)
|
|
}
|
|
|
|
if err := ctx.Err(); err != nil {
|
|
return
|
|
}
|
|
|
|
for i := range p.cfg.NoRestartOn {
|
|
if p.cfg.NoRestartOn[i] == exitCode {
|
|
return
|
|
}
|
|
}
|
|
|
|
wait = ((wait * 2) - took).Truncate(time.Millisecond)
|
|
|
|
if wait < p.cfg.MinWait {
|
|
wait = p.cfg.MinWait
|
|
} else if wait > p.cfg.MaxWait {
|
|
wait = p.cfg.MaxWait
|
|
}
|
|
|
|
p.cfg.SysLogger.Printf("will restart process in %v", wait)
|
|
|
|
select {
|
|
case <-time.After(wait):
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// Restart will block until the currently running child process has been killed
|
|
// and a new one has been spawned.
|
|
func (p *Process) Restart() {
|
|
p.Stop()
|
|
p.run()
|
|
}
|
|
|
|
// Stop will block until the child process has been killed. The Process should
|
|
// not be used again after this.
|
|
func (p *Process) Stop() {
|
|
p.stopFn()
|
|
<-p.doneCh
|
|
}
|