mnet: refactor to use Components

This commit is contained in:
Brian Picciano 2019-06-16 19:15:51 -06:00
parent 7eac20f58b
commit d0ba23ac7b
3 changed files with 52 additions and 41 deletions

View File

@ -8,6 +8,7 @@ import (
"strings" "strings"
"github.com/mediocregopher/mediocre-go-lib/mcfg" "github.com/mediocregopher/mediocre-go-lib/mcfg"
"github.com/mediocregopher/mediocre-go-lib/mcmp"
"github.com/mediocregopher/mediocre-go-lib/mctx" "github.com/mediocregopher/mediocre-go-lib/mctx"
"github.com/mediocregopher/mediocre-go-lib/merr" "github.com/mediocregopher/mediocre-go-lib/merr"
"github.com/mediocregopher/mediocre-go-lib/mlog" "github.com/mediocregopher/mediocre-go-lib/mlog"
@ -21,16 +22,13 @@ type Listener struct {
net.Listener net.Listener
net.PacketConn net.PacketConn
ctx context.Context cmp *mcmp.Component
// If set to true before mrun's stop event is run, the stop event will not
// cause the MListener to be closed.
NoCloseOnStop bool
} }
type listenerOpts struct { type listenerOpts struct {
proto string proto string
defaultAddr string defaultAddr string
closeOnShutdown bool
} }
func (lOpts listenerOpts) isPacketConn() bool { func (lOpts listenerOpts) isPacketConn() bool {
@ -51,67 +49,81 @@ func ListenerProtocol(proto string) ListenerOpt {
} }
} }
// ListenerCloseOnShutdown sets the Listener's behavior when mrun's Shutdown
// event is triggered on its Component. If true the Listener will call Close on
// itself, if false it will do nothing.
//
// Defaults to true.
func ListenerCloseOnShutdown(closeOnShutdown bool) ListenerOpt {
return func(opts *listenerOpts) {
opts.closeOnShutdown = closeOnShutdown
}
}
// ListenerDefaultAddr adjusts the defaultAddr which the Listener will use. The // ListenerDefaultAddr adjusts the defaultAddr which the Listener will use. The
// addr will still be configurable via mcfg regardless of what this is set to. // addr will still be configurable via mcfg regardless of what this is set to.
// The default is ":0". // The default is ":0".
func ListenerAddr(defaultAddr string) ListenerOpt { func ListenerDefaultAddr(defaultAddr string) ListenerOpt {
return func(opts *listenerOpts) { return func(opts *listenerOpts) {
opts.defaultAddr = defaultAddr opts.defaultAddr = defaultAddr
} }
} }
// WithListener returns a Listener which will be initialized when the start // InstListener instantiates a Listener which will be initialized when the Init
// event is triggered on the returned Context (see mrun.Start), and closed when // event is triggered on the given Component, and closed when the Shutdown event
// the stop event is triggered on the returned Context (see mrun.Stop). // is triggered on the returned Component.
func WithListener(ctx context.Context, opts ...ListenerOpt) (context.Context, *Listener) { func InstListener(cmp *mcmp.Component, opts ...ListenerOpt) *Listener {
lOpts := listenerOpts{ lOpts := listenerOpts{
proto: "tcp", proto: "tcp",
defaultAddr: ":0", defaultAddr: ":0",
closeOnShutdown: true,
} }
for _, opt := range opts { for _, opt := range opts {
opt(&lOpts) opt(&lOpts)
} }
l := &Listener{ cmp = cmp.Child("net")
ctx: mctx.NewChild(ctx, "net"), l := &Listener{cmp: cmp}
}
var addr *string addr := mcfg.String(cmp, "listen-addr",
l.ctx, addr = mcfg.WithString(l.ctx, "listen-addr", lOpts.defaultAddr, strings.ToUpper(lOpts.proto)+" address to listen on in format [host]:port. If port is 0 then a random one will be chosen") mcfg.ParamDefault(lOpts.defaultAddr),
mcfg.ParamUsage(
strings.ToUpper(lOpts.proto)+" address to listen on in format "+
"[host]:port. If port is 0 then a random one will be chosen",
),
)
l.ctx = mrun.WithStartHook(l.ctx, func(context.Context) error { mrun.InitHook(cmp, func(context.Context) error {
var err error var err error
l.ctx = mctx.Annotate(l.ctx, cmp.Annotate("proto", lOpts.proto, "addr", *addr)
"proto", lOpts.proto,
"addr", *addr)
if lOpts.isPacketConn() { if lOpts.isPacketConn() {
l.PacketConn, err = net.ListenPacket(lOpts.proto, *addr) l.PacketConn, err = net.ListenPacket(lOpts.proto, *addr)
l.ctx = mctx.Annotate(l.ctx, "addr", l.PacketConn.LocalAddr().String()) cmp.Annotate("addr", l.PacketConn.LocalAddr().String())
} else { } else {
l.Listener, err = net.Listen(lOpts.proto, *addr) l.Listener, err = net.Listen(lOpts.proto, *addr)
l.ctx = mctx.Annotate(l.ctx, "addr", l.Listener.Addr().String()) cmp.Annotate("addr", l.Listener.Addr().String())
} }
if err != nil { if err != nil {
return merr.Wrap(err, l.ctx) return merr.Wrap(err, cmp.Context())
} }
mlog.Info("listening", l.ctx) mlog.From(cmp).Info("listening")
return nil return nil
}) })
// TODO track connections and wait for them to complete before shutting // TODO track connections and wait for them to complete before shutting
// down? // down?
l.ctx = mrun.WithStopHook(l.ctx, func(context.Context) error { mrun.ShutdownHook(cmp, func(context.Context) error {
if l.NoCloseOnStop { if !lOpts.closeOnShutdown {
return nil return nil
} }
mlog.Info("stopping listener", l.ctx) mlog.From(cmp).Info("shutting down listener")
return l.Close() return l.Close()
}) })
return mctx.WithChild(ctx, l.ctx), l return l
} }
// Accept wraps a call to Accept on the underlying net.Listener, providing debug // Accept wraps a call to Accept on the underlying net.Listener, providing debug
@ -121,20 +133,19 @@ func (l *Listener) Accept() (net.Conn, error) {
if err != nil { if err != nil {
return conn, err return conn, err
} }
mlog.Debug("connection accepted", mlog.From(l.cmp).Debug("connection accepted",
mctx.Annotate(l.ctx, "remoteAddr", conn.RemoteAddr().String())) mctx.Annotated("remoteAddr", conn.RemoteAddr().String()))
return conn, nil return conn, nil
} }
// Close wraps a call to Close on the underlying net.Listener, providing debug // Close wraps a call to Close on the underlying net.Listener, providing debug
// logging. // logging.
func (l *Listener) Close() error { func (l *Listener) Close() error {
mlog.Info("listener closing", l.ctx) mlog.From(l.cmp).Info("listener closing")
if l.Listener != nil { if l.Listener != nil {
return l.Listener.Close() return l.Listener.Close()
} else {
return l.PacketConn.Close()
} }
return l.PacketConn.Close()
} }
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////

View File

@ -36,9 +36,9 @@ func TestIsReservedIP(t *T) {
} }
func TestWithListener(t *T) { func TestWithListener(t *T) {
ctx := mtest.Context() cmp := mtest.Component()
ctx, l := WithListener(ctx) l := InstListener(cmp)
mtest.Run(ctx, t, func() { mtest.Run(cmp, t, func() {
go func() { go func() {
conn, err := net.Dial("tcp", l.Addr().String()) conn, err := net.Dial("tcp", l.Addr().String())
if err != nil { if err != nil {

View File

@ -54,8 +54,8 @@ func Env(cmp *mcmp.Component, key, val string) {
// - Calls mrun.Shutdown // - Calls mrun.Shutdown
// //
// The intention is that Run is used within a test on a Component created via // The intention is that Run is used within a test on a Component created via
// this package's Component function, after any setup functions have been called // this package's Component function, after any instantiation functions have
// (e.g. mnet.AddListener). // been called (e.g. mnet.InstListener).
func Run(cmp *mcmp.Component, t *testing.T, body func()) { func Run(cmp *mcmp.Component, t *testing.T, body func()) {
if err := mrun.Init(context.Background(), cmp); err != nil { if err := mrun.Init(context.Background(), cmp); err != nil {
t.Fatal(err) t.Fatal(err)