diff --git a/README.md b/README.md index 7d6ccd1..b03dcab 100644 --- a/README.md +++ b/README.md @@ -62,7 +62,6 @@ stubborn-pinger › rtt min/avg/max/mdev = 11.161/11.161/11.161/0.000 ms ... Ctrl-C ... -pmux ~ interrupt signal received, killing all sub-processes stubborn-pinger » i will never stop, you will have to SIGKILL me! pinger ~ exit code -1, process exited: signal: interrupt pinger ~ stopped process handler @@ -75,5 +74,4 @@ stubborn-pinger › rtt min/avg/max/mdev = 14.793/14.793/14.793/0.000 ms stubborn-pinger ~ forcefully killing process stubborn-pinger ~ exit code -1, process exited: signal: killed stubborn-pinger ~ stopped process handler -pmux ~ exited gracefully, ciao! ``` diff --git a/main.go b/main.go index a4b2714..33dfb06 100644 --- a/main.go +++ b/main.go @@ -1,164 +1,19 @@ package main import ( - "bufio" "context" - "errors" "flag" "fmt" - "io" "io/ioutil" "os" "os/signal" - "strings" - "sync" "syscall" - "time" - "github.com/cryptic-io/pmux/pmuxproc" + "github.com/cryptic-io/pmux/pmuxlib" "gopkg.in/yaml.v2" ) -// pname used by pmux itself for logging. -const pmuxPName = "pmux" - -// characters used to denote different kinds of logs -const ( - logSepStdout = '›' - logSepStderr = '»' - logSepSys = '~' -) - -type logger struct { - timeFmt string - - l *sync.Mutex - out io.Writer - outBuf *bufio.Writer - - // maxPNameLen is a pointer because it changes when WithPrefix is called. - maxPNameLen *uint64 - - pname string - sep rune -} - -func newLogger( - out io.Writer, - sep rune, - timeFmt string, -) *logger { - - pname := pmuxPName - maxPNameLen := uint64(len(pname)) - - l := &logger{ - timeFmt: timeFmt, - maxPNameLen: &maxPNameLen, - l: new(sync.Mutex), - out: out, - outBuf: bufio.NewWriter(out), - pname: pname, - sep: sep, - } - - return l -} - -func (l *logger) withSep(sep rune) *logger { - l2 := *l - l2.sep = sep - return &l2 -} - -func (l *logger) withPName(pname string) *logger { - l2 := *l - l2.pname = pname - - l2.l.Lock() - defer l2.l.Unlock() - - if pnameLen := uint64(len(pname)); pnameLen > *l2.maxPNameLen { - *l2.maxPNameLen = pnameLen - } - - return &l2 -} - -func (l *logger) Close() { - - l.l.Lock() - defer l.l.Unlock() - - l.outBuf.Flush() - - if syncer, ok := l.out.(interface{ Sync() error }); ok { - _ = syncer.Sync() - } else if flusher, ok := l.out.(interface{ Flush() error }); ok { - _ = flusher.Flush() - } - - // this generally shouldn't be necessary, but we could run into cases (e.g. - // during a force-kill) where further Prints are called after a Close. These - // should just do nothing. - l.out = ioutil.Discard - l.outBuf = bufio.NewWriter(l.out) -} - -func (l *logger) println(line string) { - - l.l.Lock() - defer l.l.Unlock() - - if l.timeFmt != "" { - fmt.Fprintf( - l.outBuf, - "%s %c ", - time.Now().Format(l.timeFmt), - l.sep, - ) - } - - fmt.Fprintf( - l.outBuf, - "%s%s%c %s\n", - l.pname, - strings.Repeat(" ", int(*l.maxPNameLen+1)-len(l.pname)), - l.sep, - line, - ) - - l.outBuf.Flush() -} - -func (l *logger) Println(line string) { - l.println(line) -} - -func (l *logger) Printf(msg string, args ...interface{}) { - l.Println(fmt.Sprintf(msg, args...)) -} - -type processConfig struct { - pmuxproc.Config `yaml:",inline"` - Name string `yaml:"name"` -} - -type config struct { - TimeFormat string `yaml:"timeFormat"` - Processes []processConfig `yaml:"processes"` -} - -func (cfg config) init() (config, error) { - - if len(cfg.Processes) == 0 { - return config{}, errors.New("no processes defined") - } - - return cfg, nil -} - func main() { cfgPath := flag.String("c", "./pmux.yml", "Path to config yaml file") @@ -169,58 +24,24 @@ func main() { panic(fmt.Sprintf("couldn't read cfg file at %q: %v", *cfgPath, err)) } - var cfg config + var cfg pmuxlib.Config if err := yaml.Unmarshal(cfgB, &cfg); err != nil { panic(fmt.Sprintf("couldn't parse cfg file: %v", err)) - - } else if cfg, err = cfg.init(); err != nil { - panic(fmt.Sprintf("initializing config: %v", err)) } - stdoutLogger := newLogger(os.Stdout, logSepStdout, cfg.TimeFormat) - defer stdoutLogger.Close() - - stderrLogger := newLogger(os.Stderr, logSepStderr, cfg.TimeFormat) - defer stderrLogger.Close() - - sysLogger := stderrLogger.withSep(logSepSys) - - defer sysLogger.Println("exited gracefully, ciao!") - ctx, cancel := context.WithCancel(context.Background()) go func() { sigCh := make(chan os.Signal, 2) signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM) - sig := <-sigCh - sysLogger.Printf("%v signal received, waiting for child processes to exit", sig) + <-sigCh cancel() <-sigCh - sysLogger.Printf("forcefully exiting pmux process, there may be zombie child processes being left behind, good luck!") - sysLogger.Close() + fmt.Fprintln(os.Stderr, "forcefully exiting pmux process, there may be zombie child processes being left behind, good luck!") + os.Stderr.Sync() os.Exit(1) }() - var wg sync.WaitGroup - defer wg.Wait() - - for _, cfgProc := range cfg.Processes { - wg.Add(1) - go func(procCfg processConfig) { - defer wg.Done() - - stdoutLogger := stdoutLogger.withPName(procCfg.Name) - stderrLogger := stderrLogger.withPName(procCfg.Name) - sysLogger := sysLogger.withPName(procCfg.Name) - - defer sysLogger.Printf("stopped process handler") - - sysLogger.Println("starting process") - - pmuxproc.RunProcess( - ctx, stdoutLogger, stderrLogger, sysLogger, procCfg.Config, - ) - }(cfgProc) - } + pmuxlib.Run(ctx, cfg) } diff --git a/pmuxlib/logger.go b/pmuxlib/logger.go new file mode 100644 index 0000000..f3d2239 --- /dev/null +++ b/pmuxlib/logger.go @@ -0,0 +1,158 @@ +package pmuxlib + +import ( + "bufio" + "fmt" + "io" + "io/ioutil" + "strings" + "sync" + "time" +) + +// pname used by pmux itself for logging. +const pmuxPName = "pmux" + +// characters used to denote different kinds of logs +const ( + logSepStdout = '›' + logSepStderr = '»' + logSepSys = '~' +) + +// Logger is used by RunProcess to log process details in realtime. You can use +// a new(NullLogger) if you don't care. +type Logger interface { + Println(string) + Printf(string, ...interface{}) +} + +// NullLogger is an implementation of Logger which doesn't do anything. +type NullLogger struct{} + +func (*NullLogger) Println(string) {} +func (*NullLogger) Printf(string, ...interface{}) {} + +// PlainLogger implements Logger by writing each line directly to the given +// io.Writer as-is. +type PlainLogger struct { + io.Writer +} + +func (l PlainLogger) Println(line string) { + fmt.Fprintln(l, line) +} + +func (l PlainLogger) Printf(str string, args ...interface{}) { + fmt.Fprintf(l, str, args...) +} + +type logger struct { + timeFmt string + + l *sync.Mutex + out io.Writer + outBuf *bufio.Writer + + // maxPNameLen is a pointer because it changes when WithPrefix is called. + maxPNameLen *uint64 + + pname string + sep rune +} + +func newLogger( + out io.Writer, + sep rune, + timeFmt string, +) *logger { + + pname := pmuxPName + maxPNameLen := uint64(len(pname)) + + l := &logger{ + timeFmt: timeFmt, + maxPNameLen: &maxPNameLen, + l: new(sync.Mutex), + out: out, + outBuf: bufio.NewWriter(out), + pname: pname, + sep: sep, + } + + return l +} + +func (l *logger) withSep(sep rune) *logger { + l2 := *l + l2.sep = sep + return &l2 +} + +func (l *logger) withPName(pname string) *logger { + l2 := *l + l2.pname = pname + + l2.l.Lock() + defer l2.l.Unlock() + + if pnameLen := uint64(len(pname)); pnameLen > *l2.maxPNameLen { + *l2.maxPNameLen = pnameLen + } + + return &l2 +} + +func (l *logger) Close() { + + l.l.Lock() + defer l.l.Unlock() + + l.outBuf.Flush() + + if syncer, ok := l.out.(interface{ Sync() error }); ok { + _ = syncer.Sync() + } else if flusher, ok := l.out.(interface{ Flush() error }); ok { + _ = flusher.Flush() + } + + // this generally shouldn't be necessary, but we could run into cases (e.g. + // during a force-kill) where further Prints are called after a Close. These + // should just do nothing. + l.out = ioutil.Discard + l.outBuf = bufio.NewWriter(l.out) +} + +func (l *logger) println(line string) { + + l.l.Lock() + defer l.l.Unlock() + + if l.timeFmt != "" { + fmt.Fprintf( + l.outBuf, + "%s %c ", + time.Now().Format(l.timeFmt), + l.sep, + ) + } + + fmt.Fprintf( + l.outBuf, + "%s%s%c %s\n", + l.pname, + strings.Repeat(" ", int(*l.maxPNameLen+1)-len(l.pname)), + l.sep, + line, + ) + + l.outBuf.Flush() +} + +func (l *logger) Println(line string) { + l.println(line) +} + +func (l *logger) Printf(msg string, args ...interface{}) { + l.Println(fmt.Sprintf(msg, args...)) +} diff --git a/pmuxlib/pmuxlib.go b/pmuxlib/pmuxlib.go new file mode 100644 index 0000000..6012b4d --- /dev/null +++ b/pmuxlib/pmuxlib.go @@ -0,0 +1,49 @@ +// Package pmuxlib implements the process management aspects of the pmux +// process. +package pmuxlib + +import ( + "context" + "os" + "sync" +) + +type Config struct { + TimeFormat string `yaml:"timeFormat"` + Processes []ProcessConfig `yaml:"processes"` +} + +// Run runs the given configuration as if this was a real pmux process. +func Run(ctx context.Context, cfg Config) { + + stdoutLogger := newLogger(os.Stdout, logSepStdout, cfg.TimeFormat) + defer stdoutLogger.Close() + + stderrLogger := newLogger(os.Stderr, logSepStderr, cfg.TimeFormat) + defer stderrLogger.Close() + + sysLogger := stderrLogger.withSep(logSepSys) + defer sysLogger.Println("exited gracefully, ciao!") + + var wg sync.WaitGroup + defer wg.Wait() + + for _, cfgProc := range cfg.Processes { + wg.Add(1) + go func(procCfg ProcessConfig) { + defer wg.Done() + + stdoutLogger := stdoutLogger.withPName(procCfg.Name) + stderrLogger := stderrLogger.withPName(procCfg.Name) + sysLogger := sysLogger.withPName(procCfg.Name) + + sysLogger.Println("starting process") + defer sysLogger.Println("stopped process handler") + + RunProcess( + ctx, stdoutLogger, stderrLogger, sysLogger, procCfg, + ) + + }(cfgProc) + } +} diff --git a/pmuxproc/pmuxproc.go b/pmuxlib/proc.go similarity index 86% rename from pmuxproc/pmuxproc.go rename to pmuxlib/proc.go index f2c10e3..af8823b 100644 --- a/pmuxproc/pmuxproc.go +++ b/pmuxlib/proc.go @@ -1,6 +1,4 @@ -// Package pmuxproc implements the process management aspects of the pmux -// process. -package pmuxproc +package pmuxlib import ( "bufio" @@ -15,8 +13,11 @@ import ( "time" ) -// Config is used to configure a process via RunProcess. -type Config struct { +// ProcessConfig is used to configure a process via RunProcess. +type ProcessConfig struct { + + // Name of the process to be run. This only gets used by RunPmux. + Name string // Cmd and Args describe the actual process to run. Cmd string `yaml:"cmd"` @@ -48,7 +49,7 @@ type Config struct { NoRestartOn []int `yaml:"noRestartOn"` } -func (cfg Config) withDefaults() Config { +func (cfg ProcessConfig) withDefaults() ProcessConfig { if cfg.MinWait == 0 { cfg.MinWait = 1 * time.Second @@ -65,10 +66,10 @@ func (cfg Config) withDefaults() Config { return cfg } -// RunProcessOnce runs the process described by the Config (though it doesn't -// use all fields from the Config). The process is killed if the context is -// canceled. The exit status of the process is returned, or -1 if the process -// was never started. +// RunProcessOnce runs the process described by the ProcessConfig (though it +// doesn't use all fields from the ProcessConfig). The process is killed if the +// context is canceled. The exit status of the process is returned, or -1 if the +// process was never started. // // It returns nil if the process exits normally with a zero status. It returns // an error otherwise. @@ -78,7 +79,7 @@ func (cfg Config) withDefaults() Config { func RunProcessOnce( ctx context.Context, stdoutLogger, stderrLogger, sysLogger Logger, - cfg Config, + cfg ProcessConfig, ) ( int, error, ) { @@ -165,8 +166,8 @@ func RunProcessOnce( return exitCode, nil } -// RunProcess is a process (configured by Config) until the context is canceled, -// at which point the process is killed and RunProcess returns. +// RunProcess is a process (configured by ProcessConfig) until the context is +// canceled, at which point the process is killed and RunProcess returns. // // The process will be restarted if it exits of its own accord. There will be a // brief wait time between each restart, with an exponential backup mechanism so @@ -177,7 +178,7 @@ func RunProcessOnce( func RunProcess( ctx context.Context, stdoutLogger, stderrLogger, sysLogger Logger, - cfg Config, + cfg ProcessConfig, ) { cfg = cfg.withDefaults() diff --git a/pmuxproc/logger.go b/pmuxproc/logger.go deleted file mode 100644 index d330538..0000000 --- a/pmuxproc/logger.go +++ /dev/null @@ -1,33 +0,0 @@ -package pmuxproc - -import ( - "fmt" - "io" -) - -// Logger is used by RunProcess to log process details in realtime. You can use -// a new(NullLogger) if you don't care. -type Logger interface { - Println(string) - Printf(string, ...interface{}) -} - -// NullLogger is an implementation of Logger which doesn't do anything. -type NullLogger struct{} - -func (*NullLogger) Println(string) {} -func (*NullLogger) Printf(string, ...interface{}) {} - -// PlainLogger implements Logger by writing each line directly to the given -// io.Writer as-is. -type PlainLogger struct { - io.Writer -} - -func (l PlainLogger) Println(line string) { - fmt.Fprintln(l, line) -} - -func (l PlainLogger) Printf(str string, args ...interface{}) { - fmt.Fprintf(l, str, args...) -}