break pmuxproc out, make logger better, and properly set import path in go.mod

This commit is contained in:
Brian Picciano 2022-01-22 20:03:13 -07:00
parent 56635e63aa
commit fd793d4f81
5 changed files with 265 additions and 188 deletions

4
go.mod
View File

@ -1,5 +1,5 @@
module pmux
module github.com/cryptic-io/pmux
go 1.16
require gopkg.in/yaml.v2 v2.4.0 // indirect
require gopkg.in/yaml.v2 v2.4.0

1
go.sum
View File

@ -1,3 +1,4 @@
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=

246
main.go
View File

@ -9,13 +9,14 @@ import (
"io"
"io/ioutil"
"os"
"os/exec"
"os/signal"
"strings"
"sync"
"syscall"
"time"
"github.com/cryptic-io/pmux/pmuxproc"
"gopkg.in/yaml.v2"
)
@ -30,37 +31,39 @@ const (
type logger struct {
timeFmt string
maxPNameLen int
l sync.Mutex
l *sync.Mutex
out io.Writer
outBuf *bufio.Writer
wg sync.WaitGroup
// maxPNameLen is a pointer because it changes when WithPrefix is called.
maxPNameLen *uint64
wg *sync.WaitGroup
closeCh chan struct{}
pname string
sep rune
}
func newLogger(
out io.Writer,
timeFmt string,
possiblePNames []string,
) *logger {
maxPNameLen := 0
for _, pname := range possiblePNames {
if l := len(pname); l > maxPNameLen {
maxPNameLen = l
}
}
maxPNameLen++
pname := pmuxPName
maxPNameLen := uint64(len(pname))
l := &logger{
timeFmt: timeFmt,
maxPNameLen: maxPNameLen,
maxPNameLen: &maxPNameLen,
l: new(sync.Mutex),
out: out,
outBuf: bufio.NewWriter(out),
wg: new(sync.WaitGroup),
closeCh: make(chan struct{}),
pname: pname,
sep: logSepSys,
}
l.wg.Add(1)
@ -72,6 +75,21 @@ func newLogger(
return l
}
func (l *logger) WithPrefix(pname string, sep rune) *logger {
l2 := *l
l2.pname = pname
l2.sep = sep
l2.l.Lock()
defer l2.l.Unlock()
if pnameLen := uint64(len(pname)); pnameLen > *l2.maxPNameLen {
*l2.maxPNameLen = pnameLen
}
return &l2
}
func (l *logger) Close() {
l.l.Lock()
@ -83,9 +101,9 @@ func (l *logger) Close() {
l.outBuf.Flush()
if syncer, ok := l.out.(interface{ Sync() error }); ok {
syncer.Sync()
_ = syncer.Sync()
} else if flusher, ok := l.out.(interface{ Flush() error }); ok {
flusher.Flush()
_ = flusher.Flush()
}
// this generally shouldn't be necessary, but we could run into cases (e.g.
@ -96,6 +114,7 @@ func (l *logger) Close() {
}
func (l *logger) flusher() {
ticker := time.NewTicker(250 * time.Millisecond)
defer ticker.Stop()
@ -111,7 +130,7 @@ func (l *logger) flusher() {
}
}
func (l *logger) Println(pname string, sep rune, line string) {
func (l *logger) println(line string) {
l.l.Lock()
defer l.l.Unlock()
@ -120,160 +139,34 @@ func (l *logger) Println(pname string, sep rune, line string) {
l.outBuf,
"%s %c %s%s%c %s\n",
time.Now().Format(l.timeFmt),
sep,
pname,
strings.Repeat(" ", l.maxPNameLen-len(pname)),
sep,
l.sep,
l.pname,
strings.Repeat(" ", int(*l.maxPNameLen+1)-len(l.pname)),
l.sep,
line,
)
}
func (l *logger) Printf(pname string, sep rune, msg string, args ...interface{}) {
l.Println(pname, sep, fmt.Sprintf(msg, args...))
func (l *logger) Println(line string) {
l.println(line)
}
////////////////////////////////////////////////////////////////////////////////
func (l *logger) Printf(msg string, args ...interface{}) {
l.Println(fmt.Sprintf(msg, args...))
}
type configProcess struct {
type processConfig struct {
pmuxproc.Config `yaml:",inline"`
Name string `yaml:"name"`
Cmd string `yaml:"cmd"`
Args []string `yaml:"args"`
Env map[string]string `yaml:"env"`
MinWait time.Duration `yaml:"minWait"`
MaxWait time.Duration `yaml:"maxWait"`
SigKillWait time.Duration `yaml:"sigKillWait"`
}
func runProcessOnce(
ctx context.Context, logger *logger, cfgProc configProcess,
) error {
var wg sync.WaitGroup
fwdOutPipe := func(r io.Reader) {
wg.Add(1)
go func() {
defer wg.Done()
bufR := bufio.NewReader(r)
for {
line, err := bufR.ReadString('\n')
if errors.Is(err, io.EOF) {
return
} else if err != nil {
logger.Printf(cfgProc.Name, logSepSys, "reading output: %v", err)
return
}
logger.Println(cfgProc.Name, logSepProcOut, strings.TrimSuffix(line, "\n"))
}
}()
}
cmd := exec.Command(cfgProc.Cmd, cfgProc.Args...)
cmd.Env = make([]string, 0, len(cfgProc.Env))
for k, v := range cfgProc.Env {
cmd.Env = append(cmd.Env, k+"="+v)
}
stdout, err := cmd.StdoutPipe()
if err != nil {
return fmt.Errorf("getting stdout pipe: %w", err)
}
defer stdout.Close()
stderr, err := cmd.StderrPipe()
if err != nil {
return fmt.Errorf("getting stderr pipe: %w", err)
}
defer stderr.Close()
fwdOutPipe(stdout)
fwdOutPipe(stderr)
if err := cmd.Start(); err != nil {
return fmt.Errorf("starting process: %w", err)
}
// go-routine which will sent interrupt if the context is cancelled. Also
// waits on a secondary channel, which is closed when this function returns,
// in order to ensure this go-routine always gets cleaned up.
stopCh := make(chan struct{})
defer close(stopCh)
go func(proc *os.Process) {
select {
case <-ctx.Done():
proc.Signal(os.Interrupt)
case <-stopCh:
return
}
select {
case <-time.After(cfgProc.SigKillWait):
logger.Println(cfgProc.Name, logSepSys, "forcefully killing process")
proc.Signal(os.Kill)
case <-stopCh:
return
}
}(cmd.Process)
wg.Wait()
if err := cmd.Wait(); err != nil {
return fmt.Errorf("process exited: %w", err)
}
return nil
}
func runProcess(
ctx context.Context, logger *logger, cfgProc configProcess,
) {
var wait time.Duration
for {
logger.Println(cfgProc.Name, logSepSys, "starting process")
start := time.Now()
err := runProcessOnce(ctx, logger, cfgProc)
took := time.Since(start)
if err != nil {
logger.Printf(cfgProc.Name, logSepSys, "%v", err)
} else {
logger.Println(cfgProc.Name, logSepSys, "exit status 0")
}
if err := ctx.Err(); err != nil {
return
}
wait = ((wait * 2) - took).Truncate(time.Millisecond)
if wait < cfgProc.MinWait {
wait = cfgProc.MinWait
} else if wait > cfgProc.MaxWait {
wait = cfgProc.MaxWait
}
logger.Printf(cfgProc.Name, logSepSys, "will restart process in %v", wait)
select {
case <-time.After(wait):
case <-ctx.Done():
return
}
}
}
////////////////////////////////////////////////////////////////////////////////
type config struct {
TimeFormat string `yaml:"timeFormat"`
Processes []configProcess `yaml:"processes"`
Processes []processConfig `yaml:"processes"`
}
func (cfg config) init() (config, error) {
if cfg.TimeFormat == "" {
cfg.TimeFormat = "2006-01-02T15:04:05.000Z07:00"
}
@ -282,22 +175,6 @@ func (cfg config) init() (config, error) {
return config{}, errors.New("no processes defined")
}
for i, cfgProc := range cfg.Processes {
if cfgProc.MinWait == 0 {
cfgProc.MinWait = 1 * time.Second
}
if cfgProc.MaxWait == 0 {
cfgProc.MaxWait = 64 * time.Second
}
if cfgProc.SigKillWait == 0 {
cfgProc.SigKillWait = 10 * time.Second
}
cfg.Processes[i] = cfgProc
}
return cfg, nil
}
@ -319,26 +196,21 @@ func main() {
panic(fmt.Sprintf("initializing config: %v", err))
}
pnames := make([]string, len(cfg.Processes))
for i, cfgProc := range cfg.Processes {
pnames[i] = cfgProc.Name
}
logger := newLogger(os.Stdout, cfg.TimeFormat, pnames)
logger := newLogger(os.Stdout, cfg.TimeFormat)
defer logger.Close()
defer logger.Println(pmuxPName, logSepSys, "exited gracefully, ciao!")
defer logger.Println("exited gracefully, ciao!")
ctx, cancel := context.WithCancel(context.Background())
go func() {
sigCh := make(chan os.Signal)
sigCh := make(chan os.Signal, 2)
signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM)
sig := <-sigCh
logger.Printf(pmuxPName, logSepSys, "%v signal received, killing all sub-processes", sig)
logger.Printf("%v signal received, killing all sub-processes", sig)
cancel()
<-sigCh
logger.Printf(pmuxPName, logSepSys, "forcefully exiting pmux process, there may be zombie child processes being left behind, good luck!")
logger.Printf("forcefully exiting pmux process, there may be zombie child processes being left behind, good luck!")
logger.Close()
os.Exit(1)
}()
@ -348,10 +220,14 @@ func main() {
for _, cfgProc := range cfg.Processes {
wg.Add(1)
go func(cfgProc configProcess) {
go func(procCfg processConfig) {
defer wg.Done()
runProcess(ctx, logger, cfgProc)
logger.Printf(cfgProc.Name, logSepSys, "stopped process handler")
logger := logger.WithPrefix(procCfg.Name, logSepProcOut)
defer logger.Printf("stopped process handler")
logger.Println("starting process")
pmuxproc.RunProcess(ctx, logger, procCfg.Config)
}(cfgProc)
}
}

14
pmuxproc/logger.go Normal file
View File

@ -0,0 +1,14 @@
package pmuxproc
// Logger is used by RunProcess to log process details in realtime. You can use
// a new(NullLogger) if you don't care.
type Logger interface {
Println(string)
Printf(string, ...interface{})
}
// NullLogger is an implementation of Logger which doesn't do anything.
type NullLogger struct{}
func (*NullLogger) Println(string) {}
func (*NullLogger) Printf(string, ...interface{}) {}

186
pmuxproc/pmuxproc.go Normal file
View File

@ -0,0 +1,186 @@
// Package pmuxproc implements the process management aspects of the pmux
// process.
package pmuxproc
import (
"bufio"
"context"
"errors"
"fmt"
"io"
"os"
"os/exec"
"strings"
"sync"
"time"
)
// Config is used to configure a process via RunProcess.
type Config struct {
// Cmd and Args describe the actual process to run.
Cmd string `yaml:"cmd"`
Args []string `yaml:"args"`
// Env describes the environment variables to set on the process.
Env map[string]string `yaml:"env"`
// MinWait and MaxWait are the minimum and maximum amount of time between
// restarts that RunProcess will wait.
//
// MinWait defaults to 1 second.
// MaxWait defaults to 64 seconds.
MinWait time.Duration `yaml:"minWait"`
MaxWait time.Duration `yaml:"maxWait"`
// SigKillWait is the amount of time after the process is sent a SIGINT
// before RunProcess sends it a SIGKILL.
//
// Defalts to 10 seconds.
SigKillWait time.Duration `yaml:"sigKillWait"`
}
func (cfg Config) withDefaults() Config {
if cfg.MinWait == 0 {
cfg.MinWait = 1 * time.Second
}
if cfg.MaxWait == 0 {
cfg.MaxWait = 64 * time.Second
}
if cfg.SigKillWait == 0 {
cfg.SigKillWait = 10 * time.Second
}
return cfg
}
func runProcessOnce(ctx context.Context, logger Logger, cfg Config) error {
var wg sync.WaitGroup
fwdOutPipe := func(r io.Reader) {
wg.Add(1)
go func() {
defer wg.Done()
bufR := bufio.NewReader(r)
for {
line, err := bufR.ReadString('\n')
if errors.Is(err, io.EOF) {
return
} else if err != nil {
logger.Printf("reading output: %v", err)
return
}
logger.Println(strings.TrimSuffix(line, "\n"))
}
}()
}
cmd := exec.Command(cfg.Cmd, cfg.Args...)
cmd.Env = make([]string, 0, len(cfg.Env))
for k, v := range cfg.Env {
cmd.Env = append(cmd.Env, k+"="+v)
}
stdout, err := cmd.StdoutPipe()
if err != nil {
return fmt.Errorf("getting stdout pipe: %w", err)
}
defer stdout.Close()
stderr, err := cmd.StderrPipe()
if err != nil {
return fmt.Errorf("getting stderr pipe: %w", err)
}
defer stderr.Close()
fwdOutPipe(stdout)
fwdOutPipe(stderr)
if err := cmd.Start(); err != nil {
return fmt.Errorf("starting process: %w", err)
}
// go-routine which will sent interrupt if the context is cancelled. Also
// waits on a secondary channel, which is closed when this function returns,
// in order to ensure this go-routine always gets cleaned up.
stopCh := make(chan struct{})
defer close(stopCh)
go func(proc *os.Process) {
select {
case <-ctx.Done():
_ = proc.Signal(os.Interrupt)
case <-stopCh:
return
}
select {
case <-time.After(cfg.SigKillWait):
logger.Println("forcefully killing process")
_ = proc.Signal(os.Kill)
case <-stopCh:
return
}
}(cmd.Process)
wg.Wait()
if err := cmd.Wait(); err != nil {
return fmt.Errorf("process exited: %w", err)
}
return nil
}
// RunProcess is a process (configured by Config) until the context is canceled,
// at which point the process is killed and RunProcess returns.
//
// The process will be restarted if it exits of its own accord. There will be a
// brief wait time between each restart, with an exponential backup mechanism so
// that the wait time increases upon repeated restarts.
//
// The stdout and stderr of the process will be written to the given Logger, as
// well as various runtime events.
func RunProcess(ctx context.Context, logger Logger, cfg Config) {
cfg = cfg.withDefaults()
var wait time.Duration
for {
start := time.Now()
err := runProcessOnce(ctx, logger, cfg)
took := time.Since(start)
if err != nil {
logger.Printf("%v", err)
} else {
logger.Println("exit status 0")
}
if err := ctx.Err(); err != nil {
return
}
wait = ((wait * 2) - took).Truncate(time.Millisecond)
if wait < cfg.MinWait {
wait = cfg.MinWait
} else if wait > cfg.MaxWait {
wait = cfg.MaxWait
}
logger.Printf("will restart process in %v", wait)
select {
case <-time.After(wait):
case <-ctx.Done():
return
}
}
}