Compare commits

..

No commits in common. "5660a9a623eabfdbc61f404a8a2d467ae9d406c8" and "f5fce902e8c42b80b8e35e49d136f46134f93663" have entirely different histories.

5 changed files with 92 additions and 110 deletions

View File

@ -6,11 +6,12 @@
}, },
}: let }: let
pkgs = (import pkgsSrc) {}; pkgs = (import pkgsSrc) {};
in pkgs.mkShell { in {
name = "project-shell"; shell = pkgs.mkShell {
buildInputs = [ name = "project-shell";
pkgs.go buildInputs = [
pkgs.golangci-lint pkgs.go
pkgs.gopls pkgs.golangci-lint
]; ];
};
} }

View File

@ -3,6 +3,7 @@ package main
import ( import (
"flag" "flag"
"fmt" "fmt"
"io/ioutil"
"os" "os"
"os/signal" "os/signal"
"syscall" "syscall"
@ -17,7 +18,7 @@ func main() {
cfgPath := flag.String("c", "./pmux.yml", "Path to config yaml file") cfgPath := flag.String("c", "./pmux.yml", "Path to config yaml file")
flag.Parse() flag.Parse()
cfgB, err := os.ReadFile(*cfgPath) cfgB, err := ioutil.ReadFile(*cfgPath)
if err != nil { if err != nil {
panic(fmt.Sprintf("couldn't read cfg file at %q: %v", *cfgPath, err)) panic(fmt.Sprintf("couldn't read cfg file at %q: %v", *cfgPath, err))
} }

View File

@ -4,6 +4,7 @@ import (
"bufio" "bufio"
"fmt" "fmt"
"io" "io"
"io/ioutil"
"strings" "strings"
"sync" "sync"
"time" "time"
@ -12,12 +13,11 @@ import (
// pname used by pmux itself for logging. // pname used by pmux itself for logging.
const pmuxPName = "pmux" const pmuxPName = "pmux"
// Characters used to denote different kinds of logs in the default Pmux // characters used to denote different kinds of logs
// configuration.
const ( const (
LogSepStdout = '' logSepStdout = ''
LogSepStderr = '»' logSepStderr = '»'
LogSepSys = '~' logSepSys = '~'
) )
// Logger is used by RunProcess to log process details in realtime. You can use // Logger is used by RunProcess to log process details in realtime. You can use
@ -25,26 +25,20 @@ const (
type Logger interface { type Logger interface {
Println(string) Println(string)
Printf(string, ...interface{}) Printf(string, ...interface{})
Close() error
} }
// NullLogger is an implementation of Logger which doesn't do anything. // NullLogger is an implementation of Logger which doesn't do anything.
type NullLogger struct{} type NullLogger struct{}
var _ Logger = NullLogger{} func (*NullLogger) Println(string) {}
func (*NullLogger) Printf(string, ...interface{}) {}
func (NullLogger) Println(string) {}
func (NullLogger) Printf(string, ...interface{}) {}
func (NullLogger) Close() error { return nil }
// PlainLogger implements Logger by writing each line directly to the given // PlainLogger implements Logger by writing each line directly to the given
// io.Writer as-is. // io.Writer as-is.
type PlainLogger struct { type PlainLogger struct {
io.WriteCloser io.Writer
} }
var _ Logger = PlainLogger{}
func (l PlainLogger) Println(line string) { func (l PlainLogger) Println(line string) {
fmt.Fprintln(l, line) fmt.Fprintln(l, line)
} }
@ -53,15 +47,7 @@ func (l PlainLogger) Printf(str string, args ...interface{}) {
fmt.Fprintf(l, str, args...) fmt.Fprintf(l, str, args...)
} }
func (l PlainLogger) Close() error { type logger struct {
return l.WriteCloser.Close()
}
// PmuxLogger is used by the pmux process itself for logging. It can prefix log
// lines with a timestamp, the name of the process being logged, and a custom
// separator in front of the log line to help delineate one kind of log from
// another.
type PmuxLogger struct {
timeFmt string timeFmt string
l *sync.Mutex l *sync.Mutex
@ -75,21 +61,16 @@ type PmuxLogger struct {
sep rune sep rune
} }
var _ Logger = (*PmuxLogger)(nil) func newLogger(
out io.Writer,
sep rune,
timeFmt string,
) *logger {
// NewPmuxLogger returns a PmuxLogger which will write lines to the given pname := pmuxPName
// io.Writer, separating metadata from the line itself using the given maxPNameLen := uint64(len(pname))
// separator.
//
// If timeFmt is not empty string then the timestamp of each line will be output
// using that format.
func NewPmuxLogger(out io.Writer, sep rune, timeFmt string) *PmuxLogger {
var (
pname = pmuxPName
maxPNameLen = uint64(len(pname))
)
return &PmuxLogger{ l := &logger{
timeFmt: timeFmt, timeFmt: timeFmt,
maxPNameLen: &maxPNameLen, maxPNameLen: &maxPNameLen,
l: new(sync.Mutex), l: new(sync.Mutex),
@ -98,23 +79,17 @@ func NewPmuxLogger(out io.Writer, sep rune, timeFmt string) *PmuxLogger {
pname: pname, pname: pname,
sep: sep, sep: sep,
} }
return l
} }
// WithSep returns a copy of the PmuxLogger which uses the given separator. The func (l *logger) withSep(sep rune) *logger {
// original PmuxLogger will continue to function normally.
func (l *PmuxLogger) WithSep(sep rune) *PmuxLogger {
l2 := *l l2 := *l
l2.sep = sep l2.sep = sep
return &l2 return &l2
} }
// WithProcessName returns a copy of the PmuxLogger which will prefix log lines func (l *logger) withPName(pname string) *logger {
// with the given process name.
//
// All PmuxLoggers which are spawned using a With* method, including the
// original, will left pad their process name to the length of the longest
// process name in the group.
func (l *PmuxLogger) WithProcessName(pname string) *PmuxLogger {
l2 := *l l2 := *l
l2.pname = pname l2.pname = pname
@ -128,26 +103,27 @@ func (l *PmuxLogger) WithProcessName(pname string) *PmuxLogger {
return &l2 return &l2
} }
// Close flushes all buffered log data, and sets the logger to discard all func (l *logger) Close() {
// future print calls.
//
// PmuxLoggers created via With* methods should be each closed individually.
func (l *PmuxLogger) Close() error {
l.l.Lock() l.l.Lock()
defer l.l.Unlock() defer l.l.Unlock()
err := l.outBuf.Flush() l.outBuf.Flush()
if syncer, ok := l.out.(interface{ Sync() error }); ok {
_ = syncer.Sync()
} else if flusher, ok := l.out.(interface{ Flush() error }); ok {
_ = flusher.Flush()
}
// this generally shouldn't be necessary, but we could run into cases (e.g. // this generally shouldn't be necessary, but we could run into cases (e.g.
// during a force-kill) where further Prints are called after a Close. These // during a force-kill) where further Prints are called after a Close. These
// should just do nothing. // should just do nothing.
l.out = io.Discard l.out = ioutil.Discard
l.outBuf = bufio.NewWriter(l.out) l.outBuf = bufio.NewWriter(l.out)
return err
} }
func (l *PmuxLogger) println(line string) { func (l *logger) println(line string) {
l.l.Lock() l.l.Lock()
defer l.l.Unlock() defer l.l.Unlock()
@ -173,10 +149,10 @@ func (l *PmuxLogger) println(line string) {
l.outBuf.Flush() l.outBuf.Flush()
} }
func (l *PmuxLogger) Println(line string) { func (l *logger) Println(line string) {
l.println(line) l.println(line)
} }
func (l *PmuxLogger) Printf(msg string, args ...interface{}) { func (l *logger) Printf(msg string, args ...interface{}) {
l.Println(fmt.Sprintf(msg, args...)) l.Println(fmt.Sprintf(msg, args...))
} }

View File

@ -22,30 +22,31 @@ type Config struct {
// a leftover zombie child process. // a leftover zombie child process.
type Pmux struct { type Pmux struct {
processes map[string]*Process processes map[string]*Process
sysLogger Logger
stdoutLogger, stderrLogger, sysLogger Logger
} }
// NewPmux starts a Pmux with the given configuration. // NewPmux starts a Pmux with the given configuration.
func NewPmux(cfg Config, stdout, stderr io.Writer) *Pmux { func NewPmux(cfg Config, stdout, stderr io.Writer) *Pmux {
var ( stdoutLogger := newLogger(stdout, logSepStdout, cfg.TimeFormat)
stdoutLogger = NewPmuxLogger(stdout, LogSepStdout, cfg.TimeFormat) defer stdoutLogger.Close()
stderrLogger = NewPmuxLogger(stderr, LogSepStderr, cfg.TimeFormat)
sysLogger = stderrLogger.WithSep(LogSepSys)
p = &Pmux{ stderrLogger := newLogger(stderr, logSepStderr, cfg.TimeFormat)
processes: map[string]*Process{}, defer stderrLogger.Close()
stdoutLogger: stdoutLogger,
stderrLogger: stderrLogger, sysLogger := stderrLogger.withSep(logSepSys)
sysLogger: sysLogger,
} p := &Pmux{
) processes: map[string]*Process{},
sysLogger: sysLogger,
}
for name, cfgProc := range cfg.Processes { for name, cfgProc := range cfg.Processes {
cfgProc.StdoutLogger = stdoutLogger.WithProcessName(name) stdoutLogger := stdoutLogger.withPName(name)
cfgProc.StderrLogger = stderrLogger.WithProcessName(name) stderrLogger := stderrLogger.withPName(name)
cfgProc.SysLogger = sysLogger.WithProcessName(name) sysLogger := sysLogger.withPName(name)
p.processes[name] = NewProcess(cfgProc) p.processes[name] = NewProcess(
cfgProc, stdoutLogger, stderrLogger, sysLogger,
)
} }
return p return p
@ -96,14 +97,5 @@ func (p *Pmux) Stop() {
wg.Wait() wg.Wait()
} }
if err := p.stdoutLogger.Close(); err != nil {
p.sysLogger.Printf("Closing stdout logger failed: %v", err)
}
if err := p.stderrLogger.Close(); err != nil {
p.sysLogger.Printf("Closing stderr logger failed: %v", err)
}
p.sysLogger.Println("exited gracefully, ciao!") p.sysLogger.Println("exited gracefully, ciao!")
_ = p.sysLogger.Close()
} }

View File

@ -56,12 +56,7 @@ type ProcessConfig struct {
// Processes will not be shut down until all processes with a higher group // Processes will not be shut down until all processes with a higher group
// number are already shut down. Processes with the same group number will // number are already shut down. Processes with the same group number will
// be shut down simultaneously. // be shut down simultaneously.
Group int `yaml:"group"` Group int
// Where to send stdout, stderr log lines, and lines from pmux itself.
StdoutLogger Logger `yaml:"-"`
StderrLogger Logger `yaml:"-"`
SysLogger Logger `yaml:"-"`
} }
func (cfg ProcessConfig) withDefaults() ProcessConfig { func (cfg ProcessConfig) withDefaults() ProcessConfig {
@ -108,7 +103,13 @@ func sigProcessGroup(sysLogger Logger, proc *os.Process, sig syscall.Signal) {
// //
// The stdout and stderr of the process will be written to the corresponding // The stdout and stderr of the process will be written to the corresponding
// Loggers. Various runtime events will be written to the sysLogger. // Loggers. Various runtime events will be written to the sysLogger.
func RunProcessOnce(ctx context.Context, cfg ProcessConfig) (int, error) { func RunProcessOnce(
ctx context.Context,
stdoutLogger, stderrLogger, sysLogger Logger,
cfg ProcessConfig,
) (
int, error,
) {
cfg = cfg.withDefaults() cfg = cfg.withDefaults()
@ -168,8 +169,8 @@ func RunProcessOnce(ctx context.Context, cfg ProcessConfig) (int, error) {
} }
defer stderr.Close() defer stderr.Close()
fwdOutPipe(cfg.StdoutLogger, stdout) fwdOutPipe(stdoutLogger, stdout)
fwdOutPipe(cfg.StderrLogger, stderr) fwdOutPipe(stderrLogger, stderr)
if err := cmd.Start(); err != nil { if err := cmd.Start(); err != nil {
return -1, fmt.Errorf("starting process: %w", err) return -1, fmt.Errorf("starting process: %w", err)
@ -181,14 +182,14 @@ func RunProcessOnce(ctx context.Context, cfg ProcessConfig) (int, error) {
select { select {
case <-ctx.Done(): case <-ctx.Done():
sigProcessGroup(cfg.SysLogger, proc, syscall.SIGINT) sigProcessGroup(sysLogger, proc, syscall.SIGINT)
case <-stopCh: case <-stopCh:
return return
} }
select { select {
case <-time.After(cfg.SigKillWait): case <-time.After(cfg.SigKillWait):
sigProcessGroup(cfg.SysLogger, proc, syscall.SIGKILL) sigProcessGroup(sysLogger, proc, syscall.SIGKILL)
case <-stopCh: case <-stopCh:
} }
@ -223,15 +224,22 @@ func RunProcessOnce(ctx context.Context, cfg ProcessConfig) (int, error) {
// The stdout and stderr of the process will be written to the corresponding // The stdout and stderr of the process will be written to the corresponding
// Loggers. Various runtime events will be written to the sysLogger. // Loggers. Various runtime events will be written to the sysLogger.
type Process struct { type Process struct {
cfg ProcessConfig cfg ProcessConfig
stdoutLogger, stderrLogger, sysLogger Logger
stopFn context.CancelFunc stopFn context.CancelFunc
doneCh chan struct{} doneCh chan struct{}
} }
// NewProcess returns a new Process instance based on the given config. // NewProcess returns a new Process instance based on the given config.
func NewProcess(cfg ProcessConfig) *Process { func NewProcess(
cfg ProcessConfig, stdoutLogger, stderrLogger, sysLogger Logger,
) *Process {
p := &Process{ p := &Process{
cfg: cfg.withDefaults(), cfg: cfg.withDefaults(),
stdoutLogger: stdoutLogger,
stderrLogger: stderrLogger,
sysLogger: sysLogger,
} }
p.run() p.run()
return p return p
@ -253,14 +261,18 @@ func (p *Process) restartLoop(ctx context.Context) {
for { for {
start := time.Now() start := time.Now()
exitCode, err := RunProcessOnce(ctx, p.cfg) exitCode, err := RunProcessOnce(
ctx,
p.stdoutLogger, p.stderrLogger, p.sysLogger,
p.cfg,
)
took := time.Since(start) took := time.Since(start)
// TODO check if error was due to StartAfterFunc, change the log if so. // TODO check if error was due to StartAfterFunc, change the log if so.
if err != nil { if err != nil {
p.cfg.SysLogger.Printf("exited: %v", err) p.sysLogger.Printf("exited: %v", err)
} else { } else {
p.cfg.SysLogger.Printf("exit code: %d", exitCode) p.sysLogger.Printf("exit code: %d", exitCode)
} }
if err := ctx.Err(); err != nil { if err := ctx.Err(); err != nil {
@ -281,7 +293,7 @@ func (p *Process) restartLoop(ctx context.Context) {
wait = p.cfg.MaxWait wait = p.cfg.MaxWait
} }
p.cfg.SysLogger.Printf("will restart process in %v", wait) p.sysLogger.Printf("will restart process in %v", wait)
select { select {
case <-time.After(wait): case <-time.After(wait):