Compare commits
No commits in common. "5660a9a623eabfdbc61f404a8a2d467ae9d406c8" and "f5fce902e8c42b80b8e35e49d136f46134f93663" have entirely different histories.
5660a9a623
...
f5fce902e8
@ -6,11 +6,12 @@
|
|||||||
},
|
},
|
||||||
}: let
|
}: let
|
||||||
pkgs = (import pkgsSrc) {};
|
pkgs = (import pkgsSrc) {};
|
||||||
in pkgs.mkShell {
|
in {
|
||||||
|
shell = pkgs.mkShell {
|
||||||
name = "project-shell";
|
name = "project-shell";
|
||||||
buildInputs = [
|
buildInputs = [
|
||||||
pkgs.go
|
pkgs.go
|
||||||
pkgs.golangci-lint
|
pkgs.golangci-lint
|
||||||
pkgs.gopls
|
|
||||||
];
|
];
|
||||||
|
};
|
||||||
}
|
}
|
3
main.go
3
main.go
@ -3,6 +3,7 @@ package main
|
|||||||
import (
|
import (
|
||||||
"flag"
|
"flag"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io/ioutil"
|
||||||
"os"
|
"os"
|
||||||
"os/signal"
|
"os/signal"
|
||||||
"syscall"
|
"syscall"
|
||||||
@ -17,7 +18,7 @@ func main() {
|
|||||||
cfgPath := flag.String("c", "./pmux.yml", "Path to config yaml file")
|
cfgPath := flag.String("c", "./pmux.yml", "Path to config yaml file")
|
||||||
flag.Parse()
|
flag.Parse()
|
||||||
|
|
||||||
cfgB, err := os.ReadFile(*cfgPath)
|
cfgB, err := ioutil.ReadFile(*cfgPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(fmt.Sprintf("couldn't read cfg file at %q: %v", *cfgPath, err))
|
panic(fmt.Sprintf("couldn't read cfg file at %q: %v", *cfgPath, err))
|
||||||
}
|
}
|
||||||
|
@ -4,6 +4,7 @@ import (
|
|||||||
"bufio"
|
"bufio"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
"io/ioutil"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
@ -12,12 +13,11 @@ import (
|
|||||||
// pname used by pmux itself for logging.
|
// pname used by pmux itself for logging.
|
||||||
const pmuxPName = "pmux"
|
const pmuxPName = "pmux"
|
||||||
|
|
||||||
// Characters used to denote different kinds of logs in the default Pmux
|
// characters used to denote different kinds of logs
|
||||||
// configuration.
|
|
||||||
const (
|
const (
|
||||||
LogSepStdout = '›'
|
logSepStdout = '›'
|
||||||
LogSepStderr = '»'
|
logSepStderr = '»'
|
||||||
LogSepSys = '~'
|
logSepSys = '~'
|
||||||
)
|
)
|
||||||
|
|
||||||
// Logger is used by RunProcess to log process details in realtime. You can use
|
// Logger is used by RunProcess to log process details in realtime. You can use
|
||||||
@ -25,26 +25,20 @@ const (
|
|||||||
type Logger interface {
|
type Logger interface {
|
||||||
Println(string)
|
Println(string)
|
||||||
Printf(string, ...interface{})
|
Printf(string, ...interface{})
|
||||||
Close() error
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NullLogger is an implementation of Logger which doesn't do anything.
|
// NullLogger is an implementation of Logger which doesn't do anything.
|
||||||
type NullLogger struct{}
|
type NullLogger struct{}
|
||||||
|
|
||||||
var _ Logger = NullLogger{}
|
func (*NullLogger) Println(string) {}
|
||||||
|
func (*NullLogger) Printf(string, ...interface{}) {}
|
||||||
func (NullLogger) Println(string) {}
|
|
||||||
func (NullLogger) Printf(string, ...interface{}) {}
|
|
||||||
func (NullLogger) Close() error { return nil }
|
|
||||||
|
|
||||||
// PlainLogger implements Logger by writing each line directly to the given
|
// PlainLogger implements Logger by writing each line directly to the given
|
||||||
// io.Writer as-is.
|
// io.Writer as-is.
|
||||||
type PlainLogger struct {
|
type PlainLogger struct {
|
||||||
io.WriteCloser
|
io.Writer
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ Logger = PlainLogger{}
|
|
||||||
|
|
||||||
func (l PlainLogger) Println(line string) {
|
func (l PlainLogger) Println(line string) {
|
||||||
fmt.Fprintln(l, line)
|
fmt.Fprintln(l, line)
|
||||||
}
|
}
|
||||||
@ -53,15 +47,7 @@ func (l PlainLogger) Printf(str string, args ...interface{}) {
|
|||||||
fmt.Fprintf(l, str, args...)
|
fmt.Fprintf(l, str, args...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l PlainLogger) Close() error {
|
type logger struct {
|
||||||
return l.WriteCloser.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
// PmuxLogger is used by the pmux process itself for logging. It can prefix log
|
|
||||||
// lines with a timestamp, the name of the process being logged, and a custom
|
|
||||||
// separator in front of the log line to help delineate one kind of log from
|
|
||||||
// another.
|
|
||||||
type PmuxLogger struct {
|
|
||||||
timeFmt string
|
timeFmt string
|
||||||
|
|
||||||
l *sync.Mutex
|
l *sync.Mutex
|
||||||
@ -75,21 +61,16 @@ type PmuxLogger struct {
|
|||||||
sep rune
|
sep rune
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ Logger = (*PmuxLogger)(nil)
|
func newLogger(
|
||||||
|
out io.Writer,
|
||||||
|
sep rune,
|
||||||
|
timeFmt string,
|
||||||
|
) *logger {
|
||||||
|
|
||||||
// NewPmuxLogger returns a PmuxLogger which will write lines to the given
|
pname := pmuxPName
|
||||||
// io.Writer, separating metadata from the line itself using the given
|
maxPNameLen := uint64(len(pname))
|
||||||
// separator.
|
|
||||||
//
|
|
||||||
// If timeFmt is not empty string then the timestamp of each line will be output
|
|
||||||
// using that format.
|
|
||||||
func NewPmuxLogger(out io.Writer, sep rune, timeFmt string) *PmuxLogger {
|
|
||||||
var (
|
|
||||||
pname = pmuxPName
|
|
||||||
maxPNameLen = uint64(len(pname))
|
|
||||||
)
|
|
||||||
|
|
||||||
return &PmuxLogger{
|
l := &logger{
|
||||||
timeFmt: timeFmt,
|
timeFmt: timeFmt,
|
||||||
maxPNameLen: &maxPNameLen,
|
maxPNameLen: &maxPNameLen,
|
||||||
l: new(sync.Mutex),
|
l: new(sync.Mutex),
|
||||||
@ -98,23 +79,17 @@ func NewPmuxLogger(out io.Writer, sep rune, timeFmt string) *PmuxLogger {
|
|||||||
pname: pname,
|
pname: pname,
|
||||||
sep: sep,
|
sep: sep,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return l
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithSep returns a copy of the PmuxLogger which uses the given separator. The
|
func (l *logger) withSep(sep rune) *logger {
|
||||||
// original PmuxLogger will continue to function normally.
|
|
||||||
func (l *PmuxLogger) WithSep(sep rune) *PmuxLogger {
|
|
||||||
l2 := *l
|
l2 := *l
|
||||||
l2.sep = sep
|
l2.sep = sep
|
||||||
return &l2
|
return &l2
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithProcessName returns a copy of the PmuxLogger which will prefix log lines
|
func (l *logger) withPName(pname string) *logger {
|
||||||
// with the given process name.
|
|
||||||
//
|
|
||||||
// All PmuxLoggers which are spawned using a With* method, including the
|
|
||||||
// original, will left pad their process name to the length of the longest
|
|
||||||
// process name in the group.
|
|
||||||
func (l *PmuxLogger) WithProcessName(pname string) *PmuxLogger {
|
|
||||||
l2 := *l
|
l2 := *l
|
||||||
l2.pname = pname
|
l2.pname = pname
|
||||||
|
|
||||||
@ -128,26 +103,27 @@ func (l *PmuxLogger) WithProcessName(pname string) *PmuxLogger {
|
|||||||
return &l2
|
return &l2
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close flushes all buffered log data, and sets the logger to discard all
|
func (l *logger) Close() {
|
||||||
// future print calls.
|
|
||||||
//
|
|
||||||
// PmuxLoggers created via With* methods should be each closed individually.
|
|
||||||
func (l *PmuxLogger) Close() error {
|
|
||||||
l.l.Lock()
|
l.l.Lock()
|
||||||
defer l.l.Unlock()
|
defer l.l.Unlock()
|
||||||
|
|
||||||
err := l.outBuf.Flush()
|
l.outBuf.Flush()
|
||||||
|
|
||||||
|
if syncer, ok := l.out.(interface{ Sync() error }); ok {
|
||||||
|
_ = syncer.Sync()
|
||||||
|
} else if flusher, ok := l.out.(interface{ Flush() error }); ok {
|
||||||
|
_ = flusher.Flush()
|
||||||
|
}
|
||||||
|
|
||||||
// this generally shouldn't be necessary, but we could run into cases (e.g.
|
// this generally shouldn't be necessary, but we could run into cases (e.g.
|
||||||
// during a force-kill) where further Prints are called after a Close. These
|
// during a force-kill) where further Prints are called after a Close. These
|
||||||
// should just do nothing.
|
// should just do nothing.
|
||||||
l.out = io.Discard
|
l.out = ioutil.Discard
|
||||||
l.outBuf = bufio.NewWriter(l.out)
|
l.outBuf = bufio.NewWriter(l.out)
|
||||||
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *PmuxLogger) println(line string) {
|
func (l *logger) println(line string) {
|
||||||
|
|
||||||
l.l.Lock()
|
l.l.Lock()
|
||||||
defer l.l.Unlock()
|
defer l.l.Unlock()
|
||||||
@ -173,10 +149,10 @@ func (l *PmuxLogger) println(line string) {
|
|||||||
l.outBuf.Flush()
|
l.outBuf.Flush()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *PmuxLogger) Println(line string) {
|
func (l *logger) Println(line string) {
|
||||||
l.println(line)
|
l.println(line)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *PmuxLogger) Printf(msg string, args ...interface{}) {
|
func (l *logger) Printf(msg string, args ...interface{}) {
|
||||||
l.Println(fmt.Sprintf(msg, args...))
|
l.Println(fmt.Sprintf(msg, args...))
|
||||||
}
|
}
|
||||||
|
@ -22,30 +22,31 @@ type Config struct {
|
|||||||
// a leftover zombie child process.
|
// a leftover zombie child process.
|
||||||
type Pmux struct {
|
type Pmux struct {
|
||||||
processes map[string]*Process
|
processes map[string]*Process
|
||||||
|
sysLogger Logger
|
||||||
stdoutLogger, stderrLogger, sysLogger Logger
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewPmux starts a Pmux with the given configuration.
|
// NewPmux starts a Pmux with the given configuration.
|
||||||
func NewPmux(cfg Config, stdout, stderr io.Writer) *Pmux {
|
func NewPmux(cfg Config, stdout, stderr io.Writer) *Pmux {
|
||||||
var (
|
stdoutLogger := newLogger(stdout, logSepStdout, cfg.TimeFormat)
|
||||||
stdoutLogger = NewPmuxLogger(stdout, LogSepStdout, cfg.TimeFormat)
|
defer stdoutLogger.Close()
|
||||||
stderrLogger = NewPmuxLogger(stderr, LogSepStderr, cfg.TimeFormat)
|
|
||||||
sysLogger = stderrLogger.WithSep(LogSepSys)
|
|
||||||
|
|
||||||
p = &Pmux{
|
stderrLogger := newLogger(stderr, logSepStderr, cfg.TimeFormat)
|
||||||
|
defer stderrLogger.Close()
|
||||||
|
|
||||||
|
sysLogger := stderrLogger.withSep(logSepSys)
|
||||||
|
|
||||||
|
p := &Pmux{
|
||||||
processes: map[string]*Process{},
|
processes: map[string]*Process{},
|
||||||
stdoutLogger: stdoutLogger,
|
|
||||||
stderrLogger: stderrLogger,
|
|
||||||
sysLogger: sysLogger,
|
sysLogger: sysLogger,
|
||||||
}
|
}
|
||||||
)
|
|
||||||
|
|
||||||
for name, cfgProc := range cfg.Processes {
|
for name, cfgProc := range cfg.Processes {
|
||||||
cfgProc.StdoutLogger = stdoutLogger.WithProcessName(name)
|
stdoutLogger := stdoutLogger.withPName(name)
|
||||||
cfgProc.StderrLogger = stderrLogger.WithProcessName(name)
|
stderrLogger := stderrLogger.withPName(name)
|
||||||
cfgProc.SysLogger = sysLogger.WithProcessName(name)
|
sysLogger := sysLogger.withPName(name)
|
||||||
p.processes[name] = NewProcess(cfgProc)
|
p.processes[name] = NewProcess(
|
||||||
|
cfgProc, stdoutLogger, stderrLogger, sysLogger,
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
return p
|
return p
|
||||||
@ -96,14 +97,5 @@ func (p *Pmux) Stop() {
|
|||||||
wg.Wait()
|
wg.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := p.stdoutLogger.Close(); err != nil {
|
|
||||||
p.sysLogger.Printf("Closing stdout logger failed: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := p.stderrLogger.Close(); err != nil {
|
|
||||||
p.sysLogger.Printf("Closing stderr logger failed: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
p.sysLogger.Println("exited gracefully, ciao!")
|
p.sysLogger.Println("exited gracefully, ciao!")
|
||||||
_ = p.sysLogger.Close()
|
|
||||||
}
|
}
|
||||||
|
@ -56,12 +56,7 @@ type ProcessConfig struct {
|
|||||||
// Processes will not be shut down until all processes with a higher group
|
// Processes will not be shut down until all processes with a higher group
|
||||||
// number are already shut down. Processes with the same group number will
|
// number are already shut down. Processes with the same group number will
|
||||||
// be shut down simultaneously.
|
// be shut down simultaneously.
|
||||||
Group int `yaml:"group"`
|
Group int
|
||||||
|
|
||||||
// Where to send stdout, stderr log lines, and lines from pmux itself.
|
|
||||||
StdoutLogger Logger `yaml:"-"`
|
|
||||||
StderrLogger Logger `yaml:"-"`
|
|
||||||
SysLogger Logger `yaml:"-"`
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cfg ProcessConfig) withDefaults() ProcessConfig {
|
func (cfg ProcessConfig) withDefaults() ProcessConfig {
|
||||||
@ -108,7 +103,13 @@ func sigProcessGroup(sysLogger Logger, proc *os.Process, sig syscall.Signal) {
|
|||||||
//
|
//
|
||||||
// The stdout and stderr of the process will be written to the corresponding
|
// The stdout and stderr of the process will be written to the corresponding
|
||||||
// Loggers. Various runtime events will be written to the sysLogger.
|
// Loggers. Various runtime events will be written to the sysLogger.
|
||||||
func RunProcessOnce(ctx context.Context, cfg ProcessConfig) (int, error) {
|
func RunProcessOnce(
|
||||||
|
ctx context.Context,
|
||||||
|
stdoutLogger, stderrLogger, sysLogger Logger,
|
||||||
|
cfg ProcessConfig,
|
||||||
|
) (
|
||||||
|
int, error,
|
||||||
|
) {
|
||||||
|
|
||||||
cfg = cfg.withDefaults()
|
cfg = cfg.withDefaults()
|
||||||
|
|
||||||
@ -168,8 +169,8 @@ func RunProcessOnce(ctx context.Context, cfg ProcessConfig) (int, error) {
|
|||||||
}
|
}
|
||||||
defer stderr.Close()
|
defer stderr.Close()
|
||||||
|
|
||||||
fwdOutPipe(cfg.StdoutLogger, stdout)
|
fwdOutPipe(stdoutLogger, stdout)
|
||||||
fwdOutPipe(cfg.StderrLogger, stderr)
|
fwdOutPipe(stderrLogger, stderr)
|
||||||
|
|
||||||
if err := cmd.Start(); err != nil {
|
if err := cmd.Start(); err != nil {
|
||||||
return -1, fmt.Errorf("starting process: %w", err)
|
return -1, fmt.Errorf("starting process: %w", err)
|
||||||
@ -181,14 +182,14 @@ func RunProcessOnce(ctx context.Context, cfg ProcessConfig) (int, error) {
|
|||||||
|
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
sigProcessGroup(cfg.SysLogger, proc, syscall.SIGINT)
|
sigProcessGroup(sysLogger, proc, syscall.SIGINT)
|
||||||
case <-stopCh:
|
case <-stopCh:
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-time.After(cfg.SigKillWait):
|
case <-time.After(cfg.SigKillWait):
|
||||||
sigProcessGroup(cfg.SysLogger, proc, syscall.SIGKILL)
|
sigProcessGroup(sysLogger, proc, syscall.SIGKILL)
|
||||||
case <-stopCh:
|
case <-stopCh:
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -224,14 +225,21 @@ func RunProcessOnce(ctx context.Context, cfg ProcessConfig) (int, error) {
|
|||||||
// Loggers. Various runtime events will be written to the sysLogger.
|
// Loggers. Various runtime events will be written to the sysLogger.
|
||||||
type Process struct {
|
type Process struct {
|
||||||
cfg ProcessConfig
|
cfg ProcessConfig
|
||||||
|
stdoutLogger, stderrLogger, sysLogger Logger
|
||||||
|
|
||||||
stopFn context.CancelFunc
|
stopFn context.CancelFunc
|
||||||
doneCh chan struct{}
|
doneCh chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewProcess returns a new Process instance based on the given config.
|
// NewProcess returns a new Process instance based on the given config.
|
||||||
func NewProcess(cfg ProcessConfig) *Process {
|
func NewProcess(
|
||||||
|
cfg ProcessConfig, stdoutLogger, stderrLogger, sysLogger Logger,
|
||||||
|
) *Process {
|
||||||
p := &Process{
|
p := &Process{
|
||||||
cfg: cfg.withDefaults(),
|
cfg: cfg.withDefaults(),
|
||||||
|
stdoutLogger: stdoutLogger,
|
||||||
|
stderrLogger: stderrLogger,
|
||||||
|
sysLogger: sysLogger,
|
||||||
}
|
}
|
||||||
p.run()
|
p.run()
|
||||||
return p
|
return p
|
||||||
@ -253,14 +261,18 @@ func (p *Process) restartLoop(ctx context.Context) {
|
|||||||
|
|
||||||
for {
|
for {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
exitCode, err := RunProcessOnce(ctx, p.cfg)
|
exitCode, err := RunProcessOnce(
|
||||||
|
ctx,
|
||||||
|
p.stdoutLogger, p.stderrLogger, p.sysLogger,
|
||||||
|
p.cfg,
|
||||||
|
)
|
||||||
took := time.Since(start)
|
took := time.Since(start)
|
||||||
|
|
||||||
// TODO check if error was due to StartAfterFunc, change the log if so.
|
// TODO check if error was due to StartAfterFunc, change the log if so.
|
||||||
if err != nil {
|
if err != nil {
|
||||||
p.cfg.SysLogger.Printf("exited: %v", err)
|
p.sysLogger.Printf("exited: %v", err)
|
||||||
} else {
|
} else {
|
||||||
p.cfg.SysLogger.Printf("exit code: %d", exitCode)
|
p.sysLogger.Printf("exit code: %d", exitCode)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := ctx.Err(); err != nil {
|
if err := ctx.Err(); err != nil {
|
||||||
@ -281,7 +293,7 @@ func (p *Process) restartLoop(ctx context.Context) {
|
|||||||
wait = p.cfg.MaxWait
|
wait = p.cfg.MaxWait
|
||||||
}
|
}
|
||||||
|
|
||||||
p.cfg.SysLogger.Printf("will restart process in %v", wait)
|
p.sysLogger.Printf("will restart process in %v", wait)
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-time.After(wait):
|
case <-time.After(wait):
|
||||||
|
Loading…
Reference in New Issue
Block a user