commit fe90e93fad52807f46c0fad43b70c0ec69a22e04 Author: Brian Picciano Date: Tue Sep 21 16:36:50 2021 -0600 initial commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..2fab3aa --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +pmux diff --git a/LICENSE.txt b/LICENSE.txt new file mode 100644 index 0000000..5a8e332 --- /dev/null +++ b/LICENSE.txt @@ -0,0 +1,14 @@ + DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE + Version 2, December 2004 + + Copyright (C) 2004 Sam Hocevar + + Everyone is permitted to copy and distribute verbatim or modified + copies of this license document, and changing it is allowed as long + as the name is changed. + + DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE + TERMS AND CONDITIONS FOR COPYING, DISTRIBUTION AND MODIFICATION + + 0. You just DO WHAT THE FUCK YOU WANT TO. + diff --git a/README.md b/README.md new file mode 100644 index 0000000..6338b99 --- /dev/null +++ b/README.md @@ -0,0 +1,78 @@ +# pmux + +A dumb simple user-space process manager, for use in composing multiple +processes together into a single runable frontend. + +Features include (and are limited to): + +* Coalesces all stdout and stderr streams of all sub-processes into a single + stdout stream (with timestamps and process names prefixing each line). + +* Propagates interrupt signal to sub-processes, and waits a configurable amount + of time before SIGKILLing those which don't exit themselves. + +* Will restart processes which unexpectedly exit, with an exponential backoff + delay for those which repeatedly exit. + +* Configurable timestamp format. + +That's it. If it's not listed then pmux can't do it. + +## Usage + +To build you just `go build .` within the directory. + +To run you do `pmux -c pmux.yml`. If `-c` isn't provided then pmux will look for +`pmux.yml` in the pwd. A config file is required. + +## Example + +This repo contains [an example config file](pmux-example.yml), which shows off +all possible configuration options. + +The output the stream from this example config looks something like this: + +``` +2021-09-21T16:32:48.513-06:00 | stubborn-pinger | starting process +2021-09-21T16:32:48.513-06:00 | pinger | starting process +2021-09-21T16:32:48.532-06:00 > pinger > PING example.com (93.184.216.34) 56(84) bytes of data. +2021-09-21T16:32:48.532-06:00 > pinger > 64 bytes from 93.184.216.34 (93.184.216.34): icmp_seq=1 ttl=55 time=14.1 ms +2021-09-21T16:32:48.532-06:00 > pinger > +2021-09-21T16:32:48.532-06:00 > pinger > --- example.com ping statistics --- +2021-09-21T16:32:48.532-06:00 > pinger > 1 packets transmitted, 1 received, 0% packet loss, time 0ms +2021-09-21T16:32:48.532-06:00 > pinger > rtt min/avg/max/mdev = 14.091/14.091/14.091/0.000 ms +2021-09-21T16:32:48.532-06:00 > stubborn-pinger > PING example.com (93.184.216.34) 56(84) bytes of data. +2021-09-21T16:32:48.532-06:00 > stubborn-pinger > 64 bytes from 93.184.216.34 (93.184.216.34): icmp_seq=1 ttl=55 time=14.2 ms +2021-09-21T16:32:48.532-06:00 > stubborn-pinger > +2021-09-21T16:32:48.532-06:00 > stubborn-pinger > --- example.com ping statistics --- +2021-09-21T16:32:48.532-06:00 > stubborn-pinger > 1 packets transmitted, 1 received, 0% packet loss, time 0ms +2021-09-21T16:32:48.532-06:00 > stubborn-pinger > rtt min/avg/max/mdev = 14.154/14.154/14.154/0.000 ms +2021-09-21T16:32:49.548-06:00 > pinger > PING example.com (93.184.216.34) 56(84) bytes of data. +2021-09-21T16:32:49.548-06:00 > pinger > 64 bytes from 93.184.216.34 (93.184.216.34): icmp_seq=1 ttl=55 time=10.5 ms +2021-09-21T16:32:49.548-06:00 > pinger > +2021-09-21T16:32:49.548-06:00 > pinger > --- example.com ping statistics --- +2021-09-21T16:32:49.548-06:00 > pinger > 1 packets transmitted, 1 received, 0% packet loss, time 0ms +2021-09-21T16:32:49.548-06:00 > pinger > rtt min/avg/max/mdev = 10.451/10.451/10.451/0.000 ms +2021-09-21T16:32:49.553-06:00 > stubborn-pinger > PING example.com (93.184.216.34) 56(84) bytes of data. +2021-09-21T16:32:49.553-06:00 > stubborn-pinger > 64 bytes from 93.184.216.34 (93.184.216.34): icmp_seq=1 ttl=55 time=15.3 ms +2021-09-21T16:32:49.553-06:00 > stubborn-pinger > +2021-09-21T16:32:49.553-06:00 > stubborn-pinger > --- example.com ping statistics --- +2021-09-21T16:32:49.553-06:00 > stubborn-pinger > 1 packets transmitted, 1 received, 0% packet loss, time 0ms + +... + +^C2021-09-21T16:32:50.894-06:00 | pmux | interrupt signal received, killing all sub-processes +2021-09-21T16:32:50.895-06:00 > stubborn-pinger > i will never stop, you will have to SIGKILL me! +2021-09-21T16:32:50.895-06:00 | pinger | process exited: signal: interrupt +2021-09-21T16:32:50.895-06:00 | pinger | stopped process handler +2021-09-21T16:32:50.910-06:00 > stubborn-pinger > PING example.com (93.184.216.34) 56(84) bytes of data. +2021-09-21T16:32:50.910-06:00 > stubborn-pinger > 64 bytes from 93.184.216.34 (93.184.216.34): icmp_seq=1 ttl=55 time=11.4 ms +2021-09-21T16:32:50.910-06:00 > stubborn-pinger > +2021-09-21T16:32:50.910-06:00 > stubborn-pinger > --- example.com ping statistics --- +2021-09-21T16:32:50.910-06:00 > stubborn-pinger > 1 packets transmitted, 1 received, 0% packet loss, time 0ms +2021-09-21T16:32:50.910-06:00 > stubborn-pinger > rtt min/avg/max/mdev = 11.369/11.369/11.369/0.000 ms +2021-09-21T16:32:51.895-06:00 | stubborn-pinger | forcefully killing process +2021-09-21T16:32:51.912-06:00 | stubborn-pinger | process exited: signal: killed +2021-09-21T16:32:51.912-06:00 | stubborn-pinger | stopped process handler +2021-09-21T16:32:51.912-06:00 | pmux | exited gracefully, ciao! +``` diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..b41cd33 --- /dev/null +++ b/go.mod @@ -0,0 +1,5 @@ +module pmux + +go 1.16 + +require gopkg.in/yaml.v2 v2.4.0 // indirect diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..7534661 --- /dev/null +++ b/go.sum @@ -0,0 +1,3 @@ +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= +gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= diff --git a/main.go b/main.go new file mode 100644 index 0000000..f434539 --- /dev/null +++ b/main.go @@ -0,0 +1,357 @@ +package main + +import ( + "bufio" + "context" + "errors" + "flag" + "fmt" + "io" + "io/ioutil" + "os" + "os/exec" + "os/signal" + "strings" + "sync" + "syscall" + "time" + + "gopkg.in/yaml.v2" +) + +// pname used by pmux itself for logging. +const pmuxPName = "pmux" + +// characters used to denote different kinds of logs +const ( + logSepProcOut = '>' + logSepSys = '|' +) + +type logger struct { + timeFmt string + maxPNameLen int + + l sync.Mutex + out io.Writer + outBuf *bufio.Writer + + wg sync.WaitGroup + closeCh chan struct{} +} + +func newLogger( + out io.Writer, + timeFmt string, + possiblePNames []string, +) *logger { + + maxPNameLen := 0 + for _, pname := range possiblePNames { + if l := len(pname); l > maxPNameLen { + maxPNameLen = l + } + } + + maxPNameLen++ + + l := &logger{ + timeFmt: timeFmt, + maxPNameLen: maxPNameLen, + out: out, + outBuf: bufio.NewWriter(out), + closeCh: make(chan struct{}), + } + + l.wg.Add(1) + go func() { + defer l.wg.Done() + l.flusher() + }() + + return l +} + +func (l *logger) Close() { + + l.l.Lock() + defer l.l.Unlock() + + close(l.closeCh) + l.wg.Wait() + + l.outBuf.Flush() + + if syncer, ok := l.out.(interface{ Sync() error }); ok { + syncer.Sync() + } else if flusher, ok := l.out.(interface{ Flush() error }); ok { + flusher.Flush() + } + + // this generally shouldn't be necessary, but we could run into cases (e.g. + // during a force-kill) where further Prints are called after a Close. These + // should just do nothing. + l.out = ioutil.Discard + l.outBuf = bufio.NewWriter(l.out) +} + +func (l *logger) flusher() { + ticker := time.NewTicker(250 * time.Millisecond) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + l.l.Lock() + l.outBuf.Flush() + l.l.Unlock() + case <-l.closeCh: + return + } + } +} + +func (l *logger) Println(pname string, sep rune, line string) { + + l.l.Lock() + defer l.l.Unlock() + + fmt.Fprintf( + l.outBuf, + "%s %c %s%s%c %s\n", + time.Now().Format(l.timeFmt), + sep, + pname, + strings.Repeat(" ", l.maxPNameLen-len(pname)), + sep, + line, + ) +} + +func (l *logger) Printf(pname string, sep rune, msg string, args ...interface{}) { + l.Println(pname, sep, fmt.Sprintf(msg, args...)) +} + +//////////////////////////////////////////////////////////////////////////////// + +type configProcess struct { + Name string `yaml:"name"` + Cmd string `yaml:"cmd"` + Args []string `yaml:"args"` + Env map[string]string `yaml:"env"` + MinWait time.Duration `yaml:"minWait"` + MaxWait time.Duration `yaml:"maxWait"` + SigKillWait time.Duration `yaml:"sigKillWait"` +} + +func runProcessOnce( + ctx context.Context, logger *logger, cfgProc configProcess, +) error { + + var wg sync.WaitGroup + + fwdOutPipe := func(r io.Reader) { + wg.Add(1) + go func() { + defer wg.Done() + bufR := bufio.NewReader(r) + for { + line, err := bufR.ReadString('\n') + if errors.Is(err, io.EOF) { + return + } else if err != nil { + logger.Printf(cfgProc.Name, logSepSys, "reading output: %v", err) + return + } + + logger.Println(cfgProc.Name, logSepProcOut, strings.TrimSuffix(line, "\n")) + } + }() + } + + cmd := exec.Command(cfgProc.Cmd, cfgProc.Args...) + + cmd.Env = make([]string, 0, len(cfgProc.Env)) + for k, v := range cfgProc.Env { + cmd.Env = append(cmd.Env, k+"="+v) + } + + stdout, err := cmd.StdoutPipe() + if err != nil { + return fmt.Errorf("getting stdout pipe: %w", err) + } + defer stdout.Close() + + stderr, err := cmd.StderrPipe() + if err != nil { + return fmt.Errorf("getting stderr pipe: %w", err) + } + defer stderr.Close() + + fwdOutPipe(stdout) + fwdOutPipe(stderr) + + if err := cmd.Start(); err != nil { + return fmt.Errorf("starting process: %w", err) + } + + // go-routine which will sent interrupt if the context is cancelled. Also + // waits on a secondary channel, which is closed when this function returns, + // in order to ensure this go-routine always gets cleaned up. + stopCh := make(chan struct{}) + defer close(stopCh) + go func(proc *os.Process) { + select { + case <-ctx.Done(): + proc.Signal(os.Interrupt) + case <-stopCh: + return + } + + select { + case <-time.After(cfgProc.SigKillWait): + logger.Println(cfgProc.Name, logSepSys, "forcefully killing process") + proc.Signal(os.Kill) + case <-stopCh: + return + } + }(cmd.Process) + + wg.Wait() + + if err := cmd.Wait(); err != nil { + return fmt.Errorf("process exited: %w", err) + } + + return nil +} + +func runProcess( + ctx context.Context, logger *logger, cfgProc configProcess, +) { + + var wait time.Duration + + for { + logger.Println(cfgProc.Name, logSepSys, "starting process") + + start := time.Now() + err := runProcessOnce(ctx, logger, cfgProc) + took := time.Since(start) + + if err != nil { + logger.Printf(cfgProc.Name, logSepSys, "%v", err) + } else { + logger.Println(cfgProc.Name, logSepSys, "exit status 0") + } + + if err := ctx.Err(); err != nil { + return + } + + wait = ((wait * 2) - took).Truncate(time.Millisecond) + + if wait < cfgProc.MinWait { + wait = cfgProc.MinWait + } else if wait > cfgProc.MaxWait { + wait = cfgProc.MaxWait + } + + logger.Printf(cfgProc.Name, logSepSys, "will restart process in %v", wait) + select { + case <-time.After(wait): + case <-ctx.Done(): + return + } + } +} + +//////////////////////////////////////////////////////////////////////////////// + +type config struct { + TimeFormat string `yaml:"timeFormat"` + Processes []configProcess `yaml:"processes"` +} + +func (cfg config) init() (config, error) { + if cfg.TimeFormat == "" { + cfg.TimeFormat = "2006-01-02T15:04:05.000Z07:00" + } + + if len(cfg.Processes) == 0 { + return config{}, errors.New("no processes defined") + } + + for i, cfgProc := range cfg.Processes { + if cfgProc.MinWait == 0 { + cfgProc.MinWait = 1 * time.Second + } + + if cfgProc.MaxWait == 0 { + cfgProc.MaxWait = 64 * time.Second + } + + if cfgProc.SigKillWait == 0 { + cfgProc.SigKillWait = 10 * time.Second + } + + cfg.Processes[i] = cfgProc + } + + return cfg, nil +} + +func main() { + + cfgPath := flag.String("c", "./pmux.yml", "Path to config yaml file") + flag.Parse() + + cfgB, err := ioutil.ReadFile(*cfgPath) + if err != nil { + panic(fmt.Sprintf("couldn't read cfg file at %q: %v", *cfgPath, err)) + } + + var cfg config + if err := yaml.Unmarshal(cfgB, &cfg); err != nil { + panic(fmt.Sprintf("couldn't parse cfg file: %v", err)) + + } else if cfg, err = cfg.init(); err != nil { + panic(fmt.Sprintf("initializing config: %v", err)) + } + + pnames := make([]string, len(cfg.Processes)) + for i, cfgProc := range cfg.Processes { + pnames[i] = cfgProc.Name + } + + logger := newLogger(os.Stdout, cfg.TimeFormat, pnames) + defer logger.Close() + defer logger.Println(pmuxPName, logSepSys, "exited gracefully, ciao!") + + ctx, cancel := context.WithCancel(context.Background()) + go func() { + sigCh := make(chan os.Signal) + signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM) + + sig := <-sigCh + logger.Printf(pmuxPName, logSepSys, "%v signal received, killing all sub-processes", sig) + cancel() + + <-sigCh + logger.Printf(pmuxPName, logSepSys, "forcefully exiting pmux process, there may be zombie child processes being left behind, good luck!") + logger.Close() + os.Exit(1) + }() + + var wg sync.WaitGroup + defer wg.Wait() + + for _, cfgProc := range cfg.Processes { + wg.Add(1) + go func(cfgProc configProcess) { + defer wg.Done() + runProcess(ctx, logger, cfgProc) + logger.Printf(cfgProc.Name, logSepSys, "stopped process handler") + }(cfgProc) + } +} diff --git a/pmux-example.yml b/pmux-example.yml new file mode 100644 index 0000000..dd0c7bd --- /dev/null +++ b/pmux-example.yml @@ -0,0 +1,44 @@ +# This is an example of a pmux configuration file. + +# timeFormat defines the formatting of timestamps. See +# https://pkg.go.dev/time#pkg-constants for more info on how the formatting +# string works. +timeFormat: "2006-01-02T15:04:05.000Z07:00" + +# processes is the only required field, it must have at least one process +# defined. +processes: + + # each process must have a name and cmd. + - name: pinger + cmd: /bin/bash + args: + - "-c" + - while ping -c1 $TARGET; do sleep 1; done + + env: + TARGET: example.com + + # pmux uses an exponential backoff when restarting a process, so subsequent + # restarts will each take longer and longer. minWait/maxWait indicate the + # min/max wait times between restarts of this process, respectively. + # + # The values shown here are the defaults if none are given. + minWait: 1s + maxWait: 64s + + # once pmux has signalled a process to stop, it will wait this long for the + # process to exit before sending it a SIGKILL (aka a kill -9). + sigKillWait: 10s + + # This process will not immediately exit when pmux tells it to do so, but pmux + # will SIGKILL it after sigKillWait has elapsed. + - name: stubborn-pinger + cmd: /bin/bash + args: + - "-c" + - | + trap "echo 'i will never stop, you will have to SIGKILL me!'" SIGINT + while ping -c1 example.com; do sleep 1; done + + sigKillWait: 1s