Expose Run from pmuxlib (renamed from pmuxproc)

This commit is contained in:
Brian Picciano 2022-06-18 19:52:04 -06:00
parent cfc6166135
commit 8fb99b53d7
6 changed files with 228 additions and 234 deletions

View File

@ -62,7 +62,6 @@ stubborn-pinger rtt min/avg/max/mdev = 11.161/11.161/11.161/0.000 ms
... Ctrl-C ... ... Ctrl-C ...
pmux ~ interrupt signal received, killing all sub-processes
stubborn-pinger » i will never stop, you will have to SIGKILL me! stubborn-pinger » i will never stop, you will have to SIGKILL me!
pinger ~ exit code -1, process exited: signal: interrupt pinger ~ exit code -1, process exited: signal: interrupt
pinger ~ stopped process handler pinger ~ stopped process handler
@ -75,5 +74,4 @@ stubborn-pinger rtt min/avg/max/mdev = 14.793/14.793/14.793/0.000 ms
stubborn-pinger ~ forcefully killing process stubborn-pinger ~ forcefully killing process
stubborn-pinger ~ exit code -1, process exited: signal: killed stubborn-pinger ~ exit code -1, process exited: signal: killed
stubborn-pinger ~ stopped process handler stubborn-pinger ~ stopped process handler
pmux ~ exited gracefully, ciao!
``` ```

191
main.go
View File

@ -1,164 +1,19 @@
package main package main
import ( import (
"bufio"
"context" "context"
"errors"
"flag" "flag"
"fmt" "fmt"
"io"
"io/ioutil" "io/ioutil"
"os" "os"
"os/signal" "os/signal"
"strings"
"sync"
"syscall" "syscall"
"time"
"github.com/cryptic-io/pmux/pmuxproc" "github.com/cryptic-io/pmux/pmuxlib"
"gopkg.in/yaml.v2" "gopkg.in/yaml.v2"
) )
// pname used by pmux itself for logging.
const pmuxPName = "pmux"
// characters used to denote different kinds of logs
const (
logSepStdout = ''
logSepStderr = '»'
logSepSys = '~'
)
type logger struct {
timeFmt string
l *sync.Mutex
out io.Writer
outBuf *bufio.Writer
// maxPNameLen is a pointer because it changes when WithPrefix is called.
maxPNameLen *uint64
pname string
sep rune
}
func newLogger(
out io.Writer,
sep rune,
timeFmt string,
) *logger {
pname := pmuxPName
maxPNameLen := uint64(len(pname))
l := &logger{
timeFmt: timeFmt,
maxPNameLen: &maxPNameLen,
l: new(sync.Mutex),
out: out,
outBuf: bufio.NewWriter(out),
pname: pname,
sep: sep,
}
return l
}
func (l *logger) withSep(sep rune) *logger {
l2 := *l
l2.sep = sep
return &l2
}
func (l *logger) withPName(pname string) *logger {
l2 := *l
l2.pname = pname
l2.l.Lock()
defer l2.l.Unlock()
if pnameLen := uint64(len(pname)); pnameLen > *l2.maxPNameLen {
*l2.maxPNameLen = pnameLen
}
return &l2
}
func (l *logger) Close() {
l.l.Lock()
defer l.l.Unlock()
l.outBuf.Flush()
if syncer, ok := l.out.(interface{ Sync() error }); ok {
_ = syncer.Sync()
} else if flusher, ok := l.out.(interface{ Flush() error }); ok {
_ = flusher.Flush()
}
// this generally shouldn't be necessary, but we could run into cases (e.g.
// during a force-kill) where further Prints are called after a Close. These
// should just do nothing.
l.out = ioutil.Discard
l.outBuf = bufio.NewWriter(l.out)
}
func (l *logger) println(line string) {
l.l.Lock()
defer l.l.Unlock()
if l.timeFmt != "" {
fmt.Fprintf(
l.outBuf,
"%s %c ",
time.Now().Format(l.timeFmt),
l.sep,
)
}
fmt.Fprintf(
l.outBuf,
"%s%s%c %s\n",
l.pname,
strings.Repeat(" ", int(*l.maxPNameLen+1)-len(l.pname)),
l.sep,
line,
)
l.outBuf.Flush()
}
func (l *logger) Println(line string) {
l.println(line)
}
func (l *logger) Printf(msg string, args ...interface{}) {
l.Println(fmt.Sprintf(msg, args...))
}
type processConfig struct {
pmuxproc.Config `yaml:",inline"`
Name string `yaml:"name"`
}
type config struct {
TimeFormat string `yaml:"timeFormat"`
Processes []processConfig `yaml:"processes"`
}
func (cfg config) init() (config, error) {
if len(cfg.Processes) == 0 {
return config{}, errors.New("no processes defined")
}
return cfg, nil
}
func main() { func main() {
cfgPath := flag.String("c", "./pmux.yml", "Path to config yaml file") cfgPath := flag.String("c", "./pmux.yml", "Path to config yaml file")
@ -169,58 +24,24 @@ func main() {
panic(fmt.Sprintf("couldn't read cfg file at %q: %v", *cfgPath, err)) panic(fmt.Sprintf("couldn't read cfg file at %q: %v", *cfgPath, err))
} }
var cfg config var cfg pmuxlib.Config
if err := yaml.Unmarshal(cfgB, &cfg); err != nil { if err := yaml.Unmarshal(cfgB, &cfg); err != nil {
panic(fmt.Sprintf("couldn't parse cfg file: %v", err)) panic(fmt.Sprintf("couldn't parse cfg file: %v", err))
} else if cfg, err = cfg.init(); err != nil {
panic(fmt.Sprintf("initializing config: %v", err))
} }
stdoutLogger := newLogger(os.Stdout, logSepStdout, cfg.TimeFormat)
defer stdoutLogger.Close()
stderrLogger := newLogger(os.Stderr, logSepStderr, cfg.TimeFormat)
defer stderrLogger.Close()
sysLogger := stderrLogger.withSep(logSepSys)
defer sysLogger.Println("exited gracefully, ciao!")
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
go func() { go func() {
sigCh := make(chan os.Signal, 2) sigCh := make(chan os.Signal, 2)
signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM) signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM)
sig := <-sigCh <-sigCh
sysLogger.Printf("%v signal received, waiting for child processes to exit", sig)
cancel() cancel()
<-sigCh <-sigCh
sysLogger.Printf("forcefully exiting pmux process, there may be zombie child processes being left behind, good luck!") fmt.Fprintln(os.Stderr, "forcefully exiting pmux process, there may be zombie child processes being left behind, good luck!")
sysLogger.Close() os.Stderr.Sync()
os.Exit(1) os.Exit(1)
}() }()
var wg sync.WaitGroup pmuxlib.Run(ctx, cfg)
defer wg.Wait()
for _, cfgProc := range cfg.Processes {
wg.Add(1)
go func(procCfg processConfig) {
defer wg.Done()
stdoutLogger := stdoutLogger.withPName(procCfg.Name)
stderrLogger := stderrLogger.withPName(procCfg.Name)
sysLogger := sysLogger.withPName(procCfg.Name)
defer sysLogger.Printf("stopped process handler")
sysLogger.Println("starting process")
pmuxproc.RunProcess(
ctx, stdoutLogger, stderrLogger, sysLogger, procCfg.Config,
)
}(cfgProc)
}
} }

158
pmuxlib/logger.go Normal file
View File

@ -0,0 +1,158 @@
package pmuxlib
import (
"bufio"
"fmt"
"io"
"io/ioutil"
"strings"
"sync"
"time"
)
// pname used by pmux itself for logging.
const pmuxPName = "pmux"
// characters used to denote different kinds of logs
const (
logSepStdout = ''
logSepStderr = '»'
logSepSys = '~'
)
// Logger is used by RunProcess to log process details in realtime. You can use
// a new(NullLogger) if you don't care.
type Logger interface {
Println(string)
Printf(string, ...interface{})
}
// NullLogger is an implementation of Logger which doesn't do anything.
type NullLogger struct{}
func (*NullLogger) Println(string) {}
func (*NullLogger) Printf(string, ...interface{}) {}
// PlainLogger implements Logger by writing each line directly to the given
// io.Writer as-is.
type PlainLogger struct {
io.Writer
}
func (l PlainLogger) Println(line string) {
fmt.Fprintln(l, line)
}
func (l PlainLogger) Printf(str string, args ...interface{}) {
fmt.Fprintf(l, str, args...)
}
type logger struct {
timeFmt string
l *sync.Mutex
out io.Writer
outBuf *bufio.Writer
// maxPNameLen is a pointer because it changes when WithPrefix is called.
maxPNameLen *uint64
pname string
sep rune
}
func newLogger(
out io.Writer,
sep rune,
timeFmt string,
) *logger {
pname := pmuxPName
maxPNameLen := uint64(len(pname))
l := &logger{
timeFmt: timeFmt,
maxPNameLen: &maxPNameLen,
l: new(sync.Mutex),
out: out,
outBuf: bufio.NewWriter(out),
pname: pname,
sep: sep,
}
return l
}
func (l *logger) withSep(sep rune) *logger {
l2 := *l
l2.sep = sep
return &l2
}
func (l *logger) withPName(pname string) *logger {
l2 := *l
l2.pname = pname
l2.l.Lock()
defer l2.l.Unlock()
if pnameLen := uint64(len(pname)); pnameLen > *l2.maxPNameLen {
*l2.maxPNameLen = pnameLen
}
return &l2
}
func (l *logger) Close() {
l.l.Lock()
defer l.l.Unlock()
l.outBuf.Flush()
if syncer, ok := l.out.(interface{ Sync() error }); ok {
_ = syncer.Sync()
} else if flusher, ok := l.out.(interface{ Flush() error }); ok {
_ = flusher.Flush()
}
// this generally shouldn't be necessary, but we could run into cases (e.g.
// during a force-kill) where further Prints are called after a Close. These
// should just do nothing.
l.out = ioutil.Discard
l.outBuf = bufio.NewWriter(l.out)
}
func (l *logger) println(line string) {
l.l.Lock()
defer l.l.Unlock()
if l.timeFmt != "" {
fmt.Fprintf(
l.outBuf,
"%s %c ",
time.Now().Format(l.timeFmt),
l.sep,
)
}
fmt.Fprintf(
l.outBuf,
"%s%s%c %s\n",
l.pname,
strings.Repeat(" ", int(*l.maxPNameLen+1)-len(l.pname)),
l.sep,
line,
)
l.outBuf.Flush()
}
func (l *logger) Println(line string) {
l.println(line)
}
func (l *logger) Printf(msg string, args ...interface{}) {
l.Println(fmt.Sprintf(msg, args...))
}

49
pmuxlib/pmuxlib.go Normal file
View File

@ -0,0 +1,49 @@
// Package pmuxlib implements the process management aspects of the pmux
// process.
package pmuxlib
import (
"context"
"os"
"sync"
)
type Config struct {
TimeFormat string `yaml:"timeFormat"`
Processes []ProcessConfig `yaml:"processes"`
}
// Run runs the given configuration as if this was a real pmux process.
func Run(ctx context.Context, cfg Config) {
stdoutLogger := newLogger(os.Stdout, logSepStdout, cfg.TimeFormat)
defer stdoutLogger.Close()
stderrLogger := newLogger(os.Stderr, logSepStderr, cfg.TimeFormat)
defer stderrLogger.Close()
sysLogger := stderrLogger.withSep(logSepSys)
defer sysLogger.Println("exited gracefully, ciao!")
var wg sync.WaitGroup
defer wg.Wait()
for _, cfgProc := range cfg.Processes {
wg.Add(1)
go func(procCfg ProcessConfig) {
defer wg.Done()
stdoutLogger := stdoutLogger.withPName(procCfg.Name)
stderrLogger := stderrLogger.withPName(procCfg.Name)
sysLogger := sysLogger.withPName(procCfg.Name)
sysLogger.Println("starting process")
defer sysLogger.Println("stopped process handler")
RunProcess(
ctx, stdoutLogger, stderrLogger, sysLogger, procCfg,
)
}(cfgProc)
}
}

View File

@ -1,6 +1,4 @@
// Package pmuxproc implements the process management aspects of the pmux package pmuxlib
// process.
package pmuxproc
import ( import (
"bufio" "bufio"
@ -15,8 +13,11 @@ import (
"time" "time"
) )
// Config is used to configure a process via RunProcess. // ProcessConfig is used to configure a process via RunProcess.
type Config struct { type ProcessConfig struct {
// Name of the process to be run. This only gets used by RunPmux.
Name string
// Cmd and Args describe the actual process to run. // Cmd and Args describe the actual process to run.
Cmd string `yaml:"cmd"` Cmd string `yaml:"cmd"`
@ -48,7 +49,7 @@ type Config struct {
NoRestartOn []int `yaml:"noRestartOn"` NoRestartOn []int `yaml:"noRestartOn"`
} }
func (cfg Config) withDefaults() Config { func (cfg ProcessConfig) withDefaults() ProcessConfig {
if cfg.MinWait == 0 { if cfg.MinWait == 0 {
cfg.MinWait = 1 * time.Second cfg.MinWait = 1 * time.Second
@ -65,10 +66,10 @@ func (cfg Config) withDefaults() Config {
return cfg return cfg
} }
// RunProcessOnce runs the process described by the Config (though it doesn't // RunProcessOnce runs the process described by the ProcessConfig (though it
// use all fields from the Config). The process is killed if the context is // doesn't use all fields from the ProcessConfig). The process is killed if the
// canceled. The exit status of the process is returned, or -1 if the process // context is canceled. The exit status of the process is returned, or -1 if the
// was never started. // process was never started.
// //
// It returns nil if the process exits normally with a zero status. It returns // It returns nil if the process exits normally with a zero status. It returns
// an error otherwise. // an error otherwise.
@ -78,7 +79,7 @@ func (cfg Config) withDefaults() Config {
func RunProcessOnce( func RunProcessOnce(
ctx context.Context, ctx context.Context,
stdoutLogger, stderrLogger, sysLogger Logger, stdoutLogger, stderrLogger, sysLogger Logger,
cfg Config, cfg ProcessConfig,
) ( ) (
int, error, int, error,
) { ) {
@ -165,8 +166,8 @@ func RunProcessOnce(
return exitCode, nil return exitCode, nil
} }
// RunProcess is a process (configured by Config) until the context is canceled, // RunProcess is a process (configured by ProcessConfig) until the context is
// at which point the process is killed and RunProcess returns. // canceled, at which point the process is killed and RunProcess returns.
// //
// The process will be restarted if it exits of its own accord. There will be a // The process will be restarted if it exits of its own accord. There will be a
// brief wait time between each restart, with an exponential backup mechanism so // brief wait time between each restart, with an exponential backup mechanism so
@ -177,7 +178,7 @@ func RunProcessOnce(
func RunProcess( func RunProcess(
ctx context.Context, ctx context.Context,
stdoutLogger, stderrLogger, sysLogger Logger, stdoutLogger, stderrLogger, sysLogger Logger,
cfg Config, cfg ProcessConfig,
) { ) {
cfg = cfg.withDefaults() cfg = cfg.withDefaults()

View File

@ -1,33 +0,0 @@
package pmuxproc
import (
"fmt"
"io"
)
// Logger is used by RunProcess to log process details in realtime. You can use
// a new(NullLogger) if you don't care.
type Logger interface {
Println(string)
Printf(string, ...interface{})
}
// NullLogger is an implementation of Logger which doesn't do anything.
type NullLogger struct{}
func (*NullLogger) Println(string) {}
func (*NullLogger) Printf(string, ...interface{}) {}
// PlainLogger implements Logger by writing each line directly to the given
// io.Writer as-is.
type PlainLogger struct {
io.Writer
}
func (l PlainLogger) Println(line string) {
fmt.Fprintln(l, line)
}
func (l PlainLogger) Printf(str string, args ...interface{}) {
fmt.Fprintf(l, str, args...)
}