m: implement Log, have mdb.PubSub and mhttp.CfgServer use it, though I'm still not quite happy with it yet
This commit is contained in:
parent
15efa4ba3a
commit
5adbae953b
28
m/m.go
Normal file
28
m/m.go
Normal file
@ -0,0 +1,28 @@
|
|||||||
|
// Package m is the glue which holds all the other packages in this project
|
||||||
|
// together. While other packages in this project are intended to be able to be
|
||||||
|
// used separately and largely independently, this package combines them in ways
|
||||||
|
// which I specifically like.
|
||||||
|
package m
|
||||||
|
|
||||||
|
import (
|
||||||
|
"io"
|
||||||
|
|
||||||
|
"github.com/mediocregopher/mediocre-go-lib/mcfg"
|
||||||
|
"github.com/mediocregopher/mediocre-go-lib/mlog"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Log returns a Logger which will automatically include with the log extra
|
||||||
|
// contextual information based on the Cfg and the given KVs
|
||||||
|
//
|
||||||
|
// If the cfg is nil then mlog.DefaultLogger is returned.
|
||||||
|
func Log(cfg *mcfg.Cfg, kvs ...mlog.KVer) *mlog.Logger {
|
||||||
|
fn := cfg.FullName()
|
||||||
|
l := mlog.DefaultLogger.WithWriteFn(func(w io.Writer, msg mlog.Message) error {
|
||||||
|
msg.Msg = fn + " " + msg.Msg
|
||||||
|
return mlog.DefaultWriteFn(w, msg)
|
||||||
|
})
|
||||||
|
if len(kvs) > 0 {
|
||||||
|
l = l.WithKV(kvs...)
|
||||||
|
}
|
||||||
|
return l
|
||||||
|
}
|
16
mdb/ps.go
16
mdb/ps.go
@ -7,6 +7,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"cloud.google.com/go/pubsub"
|
"cloud.google.com/go/pubsub"
|
||||||
|
"github.com/mediocregopher/mediocre-go-lib/m"
|
||||||
"github.com/mediocregopher/mediocre-go-lib/mcfg"
|
"github.com/mediocregopher/mediocre-go-lib/mcfg"
|
||||||
"github.com/mediocregopher/mediocre-go-lib/mlog"
|
"github.com/mediocregopher/mediocre-go-lib/mlog"
|
||||||
oldctx "golang.org/x/net/context"
|
oldctx "golang.org/x/net/context"
|
||||||
@ -31,6 +32,8 @@ type Message = pubsub.Message
|
|||||||
type PubSub struct {
|
type PubSub struct {
|
||||||
proj, credFile string
|
proj, credFile string
|
||||||
*pubsub.Client
|
*pubsub.Client
|
||||||
|
|
||||||
|
log *mlog.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewPubSub initializes and returns PubSub instance for the given projectID and
|
// NewPubSub initializes and returns PubSub instance for the given projectID and
|
||||||
@ -40,9 +43,10 @@ func NewPubSub(ctx context.Context, projectID, credFile string) (*PubSub, error)
|
|||||||
proj: projectID,
|
proj: projectID,
|
||||||
credFile: credFile,
|
credFile: credFile,
|
||||||
}
|
}
|
||||||
|
ps.log = mlog.DefaultLogger.WithKV(ps.KV())
|
||||||
var err error
|
var err error
|
||||||
ps.Client, err = pubsub.NewClient(ctx, ps.proj, ps.clientOpts()...)
|
ps.Client, err = pubsub.NewClient(ctx, ps.proj, ps.clientOpts()...)
|
||||||
return ps, err
|
return ps, mlog.ErrWithKV(err, ps)
|
||||||
}
|
}
|
||||||
|
|
||||||
// CfgPubSub configures and returns a PubSub instance which will be usable once
|
// CfgPubSub configures and returns a PubSub instance which will be usable once
|
||||||
@ -59,12 +63,14 @@ func CfgPubSub(cfg *mcfg.Cfg, defaultProject string) *PubSub {
|
|||||||
cfg.Start.Then(func(ctx context.Context) error {
|
cfg.Start.Then(func(ctx context.Context) error {
|
||||||
ps.proj = *proj
|
ps.proj = *proj
|
||||||
ps.credFile = *credFile
|
ps.credFile = *credFile
|
||||||
mlog.Info("connecting to pubsub", &ps)
|
log := m.Log(cfg, ps.KV())
|
||||||
|
log.Info("connecting to pubsub")
|
||||||
psInner, err := NewPubSub(ctx, *proj, *credFile)
|
psInner, err := NewPubSub(ctx, *proj, *credFile)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return mlog.ErrWithKV(err, &ps)
|
return mlog.ErrWithKV(err, &ps)
|
||||||
}
|
}
|
||||||
ps = *psInner
|
ps = *psInner
|
||||||
|
ps.log = log
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
return &ps
|
return &ps
|
||||||
@ -235,7 +241,7 @@ func (s *Subscription) Consume(ctx context.Context, fn ConsumerFunc, opts Consum
|
|||||||
|
|
||||||
ok, err := fn(context.Context(innerCtx), msg)
|
ok, err := fn(context.Context(innerCtx), msg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
mlog.Warn("error consuming pubsub message", s, mlog.ErrKV(err))
|
s.topic.ps.log.Warn("error consuming pubsub message", s, mlog.ErrKV(err))
|
||||||
}
|
}
|
||||||
|
|
||||||
if ok {
|
if ok {
|
||||||
@ -247,7 +253,7 @@ func (s *Subscription) Consume(ctx context.Context, fn ConsumerFunc, opts Consum
|
|||||||
if octx.Err() == context.Canceled || err == nil {
|
if octx.Err() == context.Canceled || err == nil {
|
||||||
return
|
return
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
mlog.Warn("error consuming from pubsub", s, mlog.ErrKV(err))
|
s.topic.ps.log.Warn("error consuming from pubsub", s, mlog.ErrKV(err))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -339,7 +345,7 @@ func (s *Subscription) BatchConsume(
|
|||||||
}
|
}
|
||||||
ret, err := fn(thisCtx, msgs)
|
ret, err := fn(thisCtx, msgs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
mlog.Warn("error consuming pubsub batch messages", s, mlog.ErrKV(err))
|
s.topic.ps.log.Warn("error consuming pubsub batch messages", s, mlog.ErrKV(err))
|
||||||
}
|
}
|
||||||
for i := range thisGroup {
|
for i := range thisGroup {
|
||||||
thisGroup[i].retCh <- ret // retCh is buffered
|
thisGroup[i].retCh <- ret // retCh is buffered
|
||||||
|
@ -6,6 +6,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
|
||||||
|
"github.com/mediocregopher/mediocre-go-lib/m"
|
||||||
"github.com/mediocregopher/mediocre-go-lib/mcfg"
|
"github.com/mediocregopher/mediocre-go-lib/mcfg"
|
||||||
"github.com/mediocregopher/mediocre-go-lib/mlog"
|
"github.com/mediocregopher/mediocre-go-lib/mlog"
|
||||||
)
|
)
|
||||||
@ -21,12 +22,13 @@ func CfgServer(cfg *mcfg.Cfg, h http.Handler) *http.Server {
|
|||||||
cfg.Start.Then(func(ctx context.Context) error {
|
cfg.Start.Then(func(ctx context.Context) error {
|
||||||
srv.Addr = *addr
|
srv.Addr = *addr
|
||||||
kv := mlog.KV{"addr": *addr}
|
kv := mlog.KV{"addr": *addr}
|
||||||
mlog.Info("HTTP server listening", kv)
|
log := m.Log(cfg, kv)
|
||||||
|
log.Info("listening")
|
||||||
go func() {
|
go func() {
|
||||||
err := srv.ListenAndServe()
|
err := srv.ListenAndServe()
|
||||||
// TODO the listening log should happen here, somehow, now that we
|
// TODO the listening log should happen here, somehow, now that we
|
||||||
// know the actual address being listened on
|
// know the actual address being listened on
|
||||||
mlog.Fatal("http server fataled", kv, mlog.ErrKV(err))
|
log.Fatal("http server fataled", mlog.ErrKV(err))
|
||||||
}()
|
}()
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
Loading…
Reference in New Issue
Block a user