From fd793d4f81f40ec500cd727413fba9359466a7a9 Mon Sep 17 00:00:00 2001 From: Brian Picciano Date: Sat, 22 Jan 2022 20:03:13 -0700 Subject: [PATCH] break pmuxproc out, make logger better, and properly set import path in go.mod --- go.mod | 4 +- go.sum | 1 + main.go | 248 +++++++++++-------------------------------- pmuxproc/logger.go | 14 +++ pmuxproc/pmuxproc.go | 186 ++++++++++++++++++++++++++++++++ 5 files changed, 265 insertions(+), 188 deletions(-) create mode 100644 pmuxproc/logger.go create mode 100644 pmuxproc/pmuxproc.go diff --git a/go.mod b/go.mod index b41cd33..0f3e994 100644 --- a/go.mod +++ b/go.mod @@ -1,5 +1,5 @@ -module pmux +module github.com/cryptic-io/pmux go 1.16 -require gopkg.in/yaml.v2 v2.4.0 // indirect +require gopkg.in/yaml.v2 v2.4.0 diff --git a/go.sum b/go.sum index 7534661..dd0bc19 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,4 @@ +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= diff --git a/main.go b/main.go index f434539..e14ee50 100644 --- a/main.go +++ b/main.go @@ -9,13 +9,14 @@ import ( "io" "io/ioutil" "os" - "os/exec" "os/signal" "strings" "sync" "syscall" "time" + "github.com/cryptic-io/pmux/pmuxproc" + "gopkg.in/yaml.v2" ) @@ -29,38 +30,40 @@ const ( ) type logger struct { - timeFmt string - maxPNameLen int + timeFmt string - l sync.Mutex + l *sync.Mutex out io.Writer outBuf *bufio.Writer - wg sync.WaitGroup + // maxPNameLen is a pointer because it changes when WithPrefix is called. + maxPNameLen *uint64 + + wg *sync.WaitGroup closeCh chan struct{} + + pname string + sep rune } func newLogger( out io.Writer, timeFmt string, - possiblePNames []string, ) *logger { - maxPNameLen := 0 - for _, pname := range possiblePNames { - if l := len(pname); l > maxPNameLen { - maxPNameLen = l - } - } - - maxPNameLen++ + pname := pmuxPName + maxPNameLen := uint64(len(pname)) l := &logger{ timeFmt: timeFmt, - maxPNameLen: maxPNameLen, + maxPNameLen: &maxPNameLen, + l: new(sync.Mutex), out: out, outBuf: bufio.NewWriter(out), + wg: new(sync.WaitGroup), closeCh: make(chan struct{}), + pname: pname, + sep: logSepSys, } l.wg.Add(1) @@ -72,6 +75,21 @@ func newLogger( return l } +func (l *logger) WithPrefix(pname string, sep rune) *logger { + l2 := *l + l2.pname = pname + l2.sep = sep + + l2.l.Lock() + defer l2.l.Unlock() + + if pnameLen := uint64(len(pname)); pnameLen > *l2.maxPNameLen { + *l2.maxPNameLen = pnameLen + } + + return &l2 +} + func (l *logger) Close() { l.l.Lock() @@ -83,9 +101,9 @@ func (l *logger) Close() { l.outBuf.Flush() if syncer, ok := l.out.(interface{ Sync() error }); ok { - syncer.Sync() + _ = syncer.Sync() } else if flusher, ok := l.out.(interface{ Flush() error }); ok { - flusher.Flush() + _ = flusher.Flush() } // this generally shouldn't be necessary, but we could run into cases (e.g. @@ -96,6 +114,7 @@ func (l *logger) Close() { } func (l *logger) flusher() { + ticker := time.NewTicker(250 * time.Millisecond) defer ticker.Stop() @@ -111,7 +130,7 @@ func (l *logger) flusher() { } } -func (l *logger) Println(pname string, sep rune, line string) { +func (l *logger) println(line string) { l.l.Lock() defer l.l.Unlock() @@ -120,160 +139,34 @@ func (l *logger) Println(pname string, sep rune, line string) { l.outBuf, "%s %c %s%s%c %s\n", time.Now().Format(l.timeFmt), - sep, - pname, - strings.Repeat(" ", l.maxPNameLen-len(pname)), - sep, + l.sep, + l.pname, + strings.Repeat(" ", int(*l.maxPNameLen+1)-len(l.pname)), + l.sep, line, ) } -func (l *logger) Printf(pname string, sep rune, msg string, args ...interface{}) { - l.Println(pname, sep, fmt.Sprintf(msg, args...)) +func (l *logger) Println(line string) { + l.println(line) } -//////////////////////////////////////////////////////////////////////////////// - -type configProcess struct { - Name string `yaml:"name"` - Cmd string `yaml:"cmd"` - Args []string `yaml:"args"` - Env map[string]string `yaml:"env"` - MinWait time.Duration `yaml:"minWait"` - MaxWait time.Duration `yaml:"maxWait"` - SigKillWait time.Duration `yaml:"sigKillWait"` +func (l *logger) Printf(msg string, args ...interface{}) { + l.Println(fmt.Sprintf(msg, args...)) } -func runProcessOnce( - ctx context.Context, logger *logger, cfgProc configProcess, -) error { - - var wg sync.WaitGroup - - fwdOutPipe := func(r io.Reader) { - wg.Add(1) - go func() { - defer wg.Done() - bufR := bufio.NewReader(r) - for { - line, err := bufR.ReadString('\n') - if errors.Is(err, io.EOF) { - return - } else if err != nil { - logger.Printf(cfgProc.Name, logSepSys, "reading output: %v", err) - return - } - - logger.Println(cfgProc.Name, logSepProcOut, strings.TrimSuffix(line, "\n")) - } - }() - } - - cmd := exec.Command(cfgProc.Cmd, cfgProc.Args...) - - cmd.Env = make([]string, 0, len(cfgProc.Env)) - for k, v := range cfgProc.Env { - cmd.Env = append(cmd.Env, k+"="+v) - } - - stdout, err := cmd.StdoutPipe() - if err != nil { - return fmt.Errorf("getting stdout pipe: %w", err) - } - defer stdout.Close() - - stderr, err := cmd.StderrPipe() - if err != nil { - return fmt.Errorf("getting stderr pipe: %w", err) - } - defer stderr.Close() - - fwdOutPipe(stdout) - fwdOutPipe(stderr) - - if err := cmd.Start(); err != nil { - return fmt.Errorf("starting process: %w", err) - } - - // go-routine which will sent interrupt if the context is cancelled. Also - // waits on a secondary channel, which is closed when this function returns, - // in order to ensure this go-routine always gets cleaned up. - stopCh := make(chan struct{}) - defer close(stopCh) - go func(proc *os.Process) { - select { - case <-ctx.Done(): - proc.Signal(os.Interrupt) - case <-stopCh: - return - } - - select { - case <-time.After(cfgProc.SigKillWait): - logger.Println(cfgProc.Name, logSepSys, "forcefully killing process") - proc.Signal(os.Kill) - case <-stopCh: - return - } - }(cmd.Process) - - wg.Wait() - - if err := cmd.Wait(); err != nil { - return fmt.Errorf("process exited: %w", err) - } - - return nil +type processConfig struct { + pmuxproc.Config `yaml:",inline"` + Name string `yaml:"name"` } -func runProcess( - ctx context.Context, logger *logger, cfgProc configProcess, -) { - - var wait time.Duration - - for { - logger.Println(cfgProc.Name, logSepSys, "starting process") - - start := time.Now() - err := runProcessOnce(ctx, logger, cfgProc) - took := time.Since(start) - - if err != nil { - logger.Printf(cfgProc.Name, logSepSys, "%v", err) - } else { - logger.Println(cfgProc.Name, logSepSys, "exit status 0") - } - - if err := ctx.Err(); err != nil { - return - } - - wait = ((wait * 2) - took).Truncate(time.Millisecond) - - if wait < cfgProc.MinWait { - wait = cfgProc.MinWait - } else if wait > cfgProc.MaxWait { - wait = cfgProc.MaxWait - } - - logger.Printf(cfgProc.Name, logSepSys, "will restart process in %v", wait) - select { - case <-time.After(wait): - case <-ctx.Done(): - return - } - } -} - -//////////////////////////////////////////////////////////////////////////////// - type config struct { TimeFormat string `yaml:"timeFormat"` - Processes []configProcess `yaml:"processes"` + Processes []processConfig `yaml:"processes"` } func (cfg config) init() (config, error) { + if cfg.TimeFormat == "" { cfg.TimeFormat = "2006-01-02T15:04:05.000Z07:00" } @@ -282,22 +175,6 @@ func (cfg config) init() (config, error) { return config{}, errors.New("no processes defined") } - for i, cfgProc := range cfg.Processes { - if cfgProc.MinWait == 0 { - cfgProc.MinWait = 1 * time.Second - } - - if cfgProc.MaxWait == 0 { - cfgProc.MaxWait = 64 * time.Second - } - - if cfgProc.SigKillWait == 0 { - cfgProc.SigKillWait = 10 * time.Second - } - - cfg.Processes[i] = cfgProc - } - return cfg, nil } @@ -319,26 +196,21 @@ func main() { panic(fmt.Sprintf("initializing config: %v", err)) } - pnames := make([]string, len(cfg.Processes)) - for i, cfgProc := range cfg.Processes { - pnames[i] = cfgProc.Name - } - - logger := newLogger(os.Stdout, cfg.TimeFormat, pnames) + logger := newLogger(os.Stdout, cfg.TimeFormat) defer logger.Close() - defer logger.Println(pmuxPName, logSepSys, "exited gracefully, ciao!") + defer logger.Println("exited gracefully, ciao!") ctx, cancel := context.WithCancel(context.Background()) go func() { - sigCh := make(chan os.Signal) + sigCh := make(chan os.Signal, 2) signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM) sig := <-sigCh - logger.Printf(pmuxPName, logSepSys, "%v signal received, killing all sub-processes", sig) + logger.Printf("%v signal received, killing all sub-processes", sig) cancel() <-sigCh - logger.Printf(pmuxPName, logSepSys, "forcefully exiting pmux process, there may be zombie child processes being left behind, good luck!") + logger.Printf("forcefully exiting pmux process, there may be zombie child processes being left behind, good luck!") logger.Close() os.Exit(1) }() @@ -348,10 +220,14 @@ func main() { for _, cfgProc := range cfg.Processes { wg.Add(1) - go func(cfgProc configProcess) { + go func(procCfg processConfig) { defer wg.Done() - runProcess(ctx, logger, cfgProc) - logger.Printf(cfgProc.Name, logSepSys, "stopped process handler") + + logger := logger.WithPrefix(procCfg.Name, logSepProcOut) + defer logger.Printf("stopped process handler") + + logger.Println("starting process") + pmuxproc.RunProcess(ctx, logger, procCfg.Config) }(cfgProc) } } diff --git a/pmuxproc/logger.go b/pmuxproc/logger.go new file mode 100644 index 0000000..d5980f1 --- /dev/null +++ b/pmuxproc/logger.go @@ -0,0 +1,14 @@ +package pmuxproc + +// Logger is used by RunProcess to log process details in realtime. You can use +// a new(NullLogger) if you don't care. +type Logger interface { + Println(string) + Printf(string, ...interface{}) +} + +// NullLogger is an implementation of Logger which doesn't do anything. +type NullLogger struct{} + +func (*NullLogger) Println(string) {} +func (*NullLogger) Printf(string, ...interface{}) {} diff --git a/pmuxproc/pmuxproc.go b/pmuxproc/pmuxproc.go new file mode 100644 index 0000000..a807ced --- /dev/null +++ b/pmuxproc/pmuxproc.go @@ -0,0 +1,186 @@ +// Package pmuxproc implements the process management aspects of the pmux +// process. +package pmuxproc + +import ( + "bufio" + "context" + "errors" + "fmt" + "io" + "os" + "os/exec" + "strings" + "sync" + "time" +) + +// Config is used to configure a process via RunProcess. +type Config struct { + + // Cmd and Args describe the actual process to run. + Cmd string `yaml:"cmd"` + Args []string `yaml:"args"` + + // Env describes the environment variables to set on the process. + Env map[string]string `yaml:"env"` + + // MinWait and MaxWait are the minimum and maximum amount of time between + // restarts that RunProcess will wait. + // + // MinWait defaults to 1 second. + // MaxWait defaults to 64 seconds. + MinWait time.Duration `yaml:"minWait"` + MaxWait time.Duration `yaml:"maxWait"` + + // SigKillWait is the amount of time after the process is sent a SIGINT + // before RunProcess sends it a SIGKILL. + // + // Defalts to 10 seconds. + SigKillWait time.Duration `yaml:"sigKillWait"` +} + +func (cfg Config) withDefaults() Config { + + if cfg.MinWait == 0 { + cfg.MinWait = 1 * time.Second + } + + if cfg.MaxWait == 0 { + cfg.MaxWait = 64 * time.Second + } + + if cfg.SigKillWait == 0 { + cfg.SigKillWait = 10 * time.Second + } + + return cfg +} + +func runProcessOnce(ctx context.Context, logger Logger, cfg Config) error { + + var wg sync.WaitGroup + + fwdOutPipe := func(r io.Reader) { + wg.Add(1) + go func() { + defer wg.Done() + bufR := bufio.NewReader(r) + for { + line, err := bufR.ReadString('\n') + if errors.Is(err, io.EOF) { + return + } else if err != nil { + logger.Printf("reading output: %v", err) + return + } + + logger.Println(strings.TrimSuffix(line, "\n")) + } + }() + } + + cmd := exec.Command(cfg.Cmd, cfg.Args...) + + cmd.Env = make([]string, 0, len(cfg.Env)) + for k, v := range cfg.Env { + cmd.Env = append(cmd.Env, k+"="+v) + } + + stdout, err := cmd.StdoutPipe() + if err != nil { + return fmt.Errorf("getting stdout pipe: %w", err) + } + defer stdout.Close() + + stderr, err := cmd.StderrPipe() + if err != nil { + return fmt.Errorf("getting stderr pipe: %w", err) + } + defer stderr.Close() + + fwdOutPipe(stdout) + fwdOutPipe(stderr) + + if err := cmd.Start(); err != nil { + return fmt.Errorf("starting process: %w", err) + } + + // go-routine which will sent interrupt if the context is cancelled. Also + // waits on a secondary channel, which is closed when this function returns, + // in order to ensure this go-routine always gets cleaned up. + stopCh := make(chan struct{}) + defer close(stopCh) + go func(proc *os.Process) { + select { + case <-ctx.Done(): + _ = proc.Signal(os.Interrupt) + case <-stopCh: + return + } + + select { + case <-time.After(cfg.SigKillWait): + logger.Println("forcefully killing process") + _ = proc.Signal(os.Kill) + case <-stopCh: + return + } + }(cmd.Process) + + wg.Wait() + + if err := cmd.Wait(); err != nil { + return fmt.Errorf("process exited: %w", err) + } + + return nil +} + +// RunProcess is a process (configured by Config) until the context is canceled, +// at which point the process is killed and RunProcess returns. +// +// The process will be restarted if it exits of its own accord. There will be a +// brief wait time between each restart, with an exponential backup mechanism so +// that the wait time increases upon repeated restarts. +// +// The stdout and stderr of the process will be written to the given Logger, as +// well as various runtime events. +func RunProcess(ctx context.Context, logger Logger, cfg Config) { + + cfg = cfg.withDefaults() + + var wait time.Duration + + for { + start := time.Now() + err := runProcessOnce(ctx, logger, cfg) + took := time.Since(start) + + if err != nil { + logger.Printf("%v", err) + } else { + logger.Println("exit status 0") + } + + if err := ctx.Err(); err != nil { + return + } + + wait = ((wait * 2) - took).Truncate(time.Millisecond) + + if wait < cfg.MinWait { + wait = cfg.MinWait + } else if wait > cfg.MaxWait { + wait = cfg.MaxWait + } + + logger.Printf("will restart process in %v", wait) + + select { + case <-time.After(wait): + case <-ctx.Done(): + return + } + } +}